[ 
https://issues.apache.org/jira/browse/KAFKA-2818?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew Bruce updated KAFKA-2818:
---------------------------------
    Status: Patch Available  (was: Open)

diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 7c03a24..a48ffb2 100755
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -278,6 +278,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
                       error("Forcing the controller to resign")
                       brokerRequestBatch.clear()
                       controllerElector.resign()
+                      //Run the Resignation callback directly as it doesn't 
get called after the exception is propogated
+                      onControllerResignation()
 
                       throw e
                     }
@@ -913,6 +915,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
             error("Forcing the controller to resign")
             brokerRequestBatch.clear()
             controllerElector.resign()
+           //Run the Resignation callback directly as it doesn't get called 
after the exception is propogated
+            onControllerResignation()
 
             throw e
           }
@@ -1033,6 +1037,8 @@ class KafkaController(val config : KafkaConfig, zkUtils: 
ZkUtils, val brokerStat
         error("Forcing the controller to resign")
         brokerRequestBatch.clear()
         controllerElector.resign()
+        //Run the Resignation callback directly as it doesn't get called after 
the exception is propogated
+        onControllerResignation()
 
         throw e
       }
diff --git a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala 
b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
index c6f80ac..d4d1f50 100755
--- a/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
+++ b/core/src/main/scala/kafka/controller/TopicDeletionManager.scala
@@ -106,7 +106,7 @@ class TopicDeletionManager(controller: KafkaController,
    */
   def shutdown() {
     // Only allow one shutdown to go through
-    if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
+    if (isDeleteTopicEnabled && deleteTopicsThread != null && 
deleteTopicsThread.initiateShutdown()) {
       // Resume the topic deletion so it doesn't block on the condition
       resumeTopicDeletionThread()
       // Await delete topic thread to exit


> Clean up Controller Object on forced Resignation
> ------------------------------------------------
>
>                 Key: KAFKA-2818
>                 URL: https://issues.apache.org/jira/browse/KAFKA-2818
>             Project: Kafka
>          Issue Type: Bug
>          Components: controller
>    Affects Versions: 0.9.0.0
>            Reporter: Matthew Bruce
>            Assignee: Neha Narkhede
>            Priority: Minor
>         Attachments: KAFKA-2818.patch
>
>
> Currently if the controller does a forced resignation (if an exception is 
> caught during updateLeaderEpochAndSendRequest, SendUpdateMetadataRequest or 
> shutdownBroker), the Zookeeper resignation callback function 
> OnControllerResignation doesn't get a chance to execute which leaves some 
> artifacts laying around.  In particular the Sensors dont get cleaned up and 
> if the Kafka broker ever gets re-elected as Controller it will fail due to 
> some metrics already existing.  An Error and stack trace of such an event is 
> below.
> A forced resignation situation can be induced with a mis-config in 
> broker.properties fairly easily, by settig only SASL_PLAINTTEXT listeners and 
> setting inter.broker.protocol.version=0.8.2.X
> {code}
> listeners=SASL_PLAINTEXT://<HOST FQDN>:9092
> inter.broker.protocol.version=0.8.2.X
> security.inter.broker.protocol=SASL_PLAINTEXT
> {code}
> {code}
> [2015-11-09 16:33:47,510] ERROR Error while electing or becoming leader on 
> broker 182050300 (kafka.server.ZookeeperLeaderElector)
> java.lang.IllegalArgumentException: A metric named 'MetricName 
> [name=connection-close-rate, group=controller-channel-metrics, 
> description=Connections closed per second in the window., 
> tags={broker-id=182050300}]' already exists, can't register another one.
>         at 
> org.apache.kafka.common.metrics.Metrics.registerMetric(Metrics.java:285)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:177)
>         at org.apache.kafka.common.metrics.Sensor.add(Sensor.java:162)
>         at 
> org.apache.kafka.common.network.Selector$SelectorMetrics.<init>(Selector.java:578)
>         at org.apache.kafka.common.network.Selector.<init>(Selector.java:112)
>         at 
> kafka.controller.ControllerChannelManager.kafka$controller$ControllerChannelManager$$addNewBroker(ControllerChannelManager.scala:91)
>         at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
>         at 
> kafka.controller.ControllerChannelManager$$anonfun$1.apply(ControllerChannelManager.scala:43)
>         at scala.collection.immutable.Set$Set1.foreach(Set.scala:74)
>         at 
> kafka.controller.ControllerChannelManager.<init>(ControllerChannelManager.scala:43)
>         at 
> kafka.controller.KafkaController.startChannelManager(KafkaController.scala:819)
>         at 
> kafka.controller.KafkaController.initializeControllerContext(KafkaController.scala:747)
>         at 
> kafka.controller.KafkaController.onControllerFailover(KafkaController.scala:330)
>         at 
> kafka.controller.KafkaController$$anonfun$1.apply$mcV$sp(KafkaController.scala:163)
>         at 
> kafka.server.ZookeeperLeaderElector.elect(ZookeeperLeaderElector.scala:84)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply$mcZ$sp(ZookeeperLeaderElector.scala:146)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener$$anonfun$handleDataDeleted$1.apply(ZookeeperLeaderElector.scala:141)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262)
>         at 
> kafka.server.ZookeeperLeaderElector$LeaderChangeListener.handleDataDeleted(ZookeeperLeaderElector.scala:141)
>         at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:823)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to