[ https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthew Bruce updated KAFKA-2818: --------------------------------- Status: Patch Available (was: Open) diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 7c03a24..a48ffb2 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -278,6 +278,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat error("Forcing the controller to resign") brokerRequestBatch.clear() controllerElector.resign() + //Run the Resignation callback directly as it doesn't get called after the exception is propogated + onControllerResignation() throw e } @@ -913,6 +915,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat error("Forcing the controller to resign") brokerRequestBatch.clear() controllerElector.resign() + //Run the Resignation callback directly as it doesn't get called after the exception is propogated + onControllerResignation() throw e } @@ -1033,6 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat error("Forcing the controller to resign") brokerRequestBatch.clear() controllerElector.resign() + //Run the Resignation callback directly as it doesn't get called after the exception is propogated + onControllerResignation() throw e } diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala index c6f80ac..d4d1f50 100755 --- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala +++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala @@ -106,7 +106,7 @@ class TopicDeletionManager(controller: KafkaController, */ def shutdown() { // Only allow one shutdown to go through - if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) { + if (isDeleteTopicEnabled && deleteTopicsThread != null && deleteTopicsThread.initiateShutdown()) { // Resume the topic deletion so it doesn't block on the condition resumeTopicDeletionThread() // Await delete topic thread to exit > Clean up Controller Object on forced Resignation > ------------------------------------------------ > > Key: KAFKA-2818 > URL: https://issues.apache.org/jira/browse/KAFKA-2818 > Project: Kafka > Issue Type: Bug > Components: controller > Affects Versions: 0.9.0.0 > Reporter: Matthew Bruce > Assignee: Neha Narkhede > Priority: Minor > Attachments: KAFKA-2818.patch > > > Currently if the controller does a forced resignation (if an exception is > caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or > shutdownBroker), the Zookeeper resignation callback function > OnControllerResignation doesn't get a chance to execute which leaves some > artifacts laying around. In particular the Sensors dont get cleaned up and > if the Kafka broker ever gets re-elected as Controller it will fail due to > some metrics already existing. An Error and stack trace of such an event is > below. > A forced resignation situation can be induced with a mis-config in > broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and > setting inter.broker.protocol.version=0.8.2.X > {code} > listeners=SASL_PLAINTEXT://<HOST FQDN>:9092 > inter.broker.protocol.version=0.8.2.X > security.inter.broker.protocol=SASL_PLAINTEXT > {code} > {code} > [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on > broker 182050300 (kafka.server.ZookeeperLeaderElector) > java.lang.IllegalArgumentException: A metric named 'MetricName > [name=connection-close-rate, group=controller-channel-metrics, > description=Connections closed per second in the window., > tags={broker-id=182050300}]' already exists, can't register another one. > at > org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177) > at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162) > at > org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:578) > at org.apache.kafka.common.network.Selector.<init>(Selector.java:112) > at > kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43) > at > kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:74) > at > kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:43) > at > kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819) > at > kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747) > at > kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330) > at > kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163) > at > kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141) > at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) > at > kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141) > at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823) > at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)