commit missing code
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/033872b3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/033872b3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/033872b3 Branch: refs/heads/trunk Commit: 033872b316fd5a68d7463138d8199fb5d821f41b Parents: a4cd17a Author: Sriram Subramanian <[email protected]> Authored: Tue Nov 19 17:38:13 2013 -0800 Committer: Sriram Subramanian <[email protected]> Committed: Tue Nov 19 17:38:13 2013 -0800 ---------------------------------------------------------------------- core/src/main/scala/kafka/controller/KafkaController.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/033872b3/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 476ed86..e2ad682 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -114,7 +114,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg config.brokerId) // have a separate scheduler for the controller to be able to start and stop independently of the // kafka server - private val controllerScheduler = new KafkaScheduler(1) + private val autoRebalanceScheduler = new KafkaScheduler(1) val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext) private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext) private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext) @@ -255,8 +255,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") - controllerScheduler.startup() - controllerScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, + autoRebalanceScheduler.startup() + autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds, TimeUnit.SECONDS) } } @@ -502,7 +502,8 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg isRunning = false partitionStateMachine.shutdown() replicaStateMachine.shutdown() - controllerScheduler.shutdown() + if (config.autoLeaderRebalanceEnable) + autoRebalanceScheduler.shutdown() if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null
