Repository: kafka Updated Branches: refs/heads/0.8.1 dd08538a4 -> 2b6375b61
KAFKA-1358 Broker throws exception when reconnecting to zookeeper; reviewed by Neha Narkhede Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2b6375b6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2b6375b6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2b6375b6 Branch: refs/heads/0.8.1 Commit: 2b6375b61c0137cdff4b7c25967405f6f521489a Parents: dd08538 Author: Timothy Chen <[email protected]> Authored: Wed Apr 2 19:09:44 2014 -0700 Committer: Neha Narkhede <[email protected]> Committed: Wed Apr 2 19:10:00 2014 -0700 ---------------------------------------------------------------------- core/src/main/scala/kafka/controller/KafkaController.scala | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/2b6375b6/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index d142f8c..6de8713 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -338,11 +338,16 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg def onControllerResignation() { inLock(controllerContext.controllerLock) { Utils.unregisterMBean(KafkaController.MBeanName) - deleteTopicManager.shutdown() + + if(deleteTopicManager != null) + deleteTopicManager.shutdown() + partitionStateMachine.shutdown() replicaStateMachine.shutdown() + if(config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown() + if(controllerContext.controllerChannelManager != null) { controllerContext.controllerChannelManager.shutdown() controllerContext.controllerChannelManager = null @@ -1058,7 +1063,6 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg */ @throws(classOf[Exception]) def handleNewSession() { - info("ZK expired; shut down all controller components and try to re-elect") inLock(controllerContext.controllerLock) { onControllerResignation() controllerElector.elect
