[
https://issues.apache.org/jira/browse/KAFKA-1887?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14329318#comment-14329318
]
Sriharsha Chintalapani commented on KAFKA-1887:
-----------------------------------------------
This can be fixed by moving KafkaHealthcheck.shutdown() after
controller.shutdown() as suggested by [~nehanarkhede] [~gwenshap] . We also
need to move kafkaHealthCheck.start() before controller.start() otherwise we
will see the same error in state-change.log after broker started.
The below patch causing the unit tests run time to go from 9min 30 sec to
15mins on my machine and also causing intermittent test failure in
ProducerFailureHandlingTest.testNotEnoughReplicasAfterBrokerShutdown as
producer.send.get gets an NotEnoughReplicasAfterAppendException instead of
NotEnoughReplicasException (probably not related to this patch). I am looking
into the test slowness with the patch but if you have any idea/fix for this
please go ahead and take the jira I don't want to hold up 0.8.2.1 release. I'll
update the jira as soon as I've a fix.
{code}
+ /* tell everyone we are alive */
+ kafkaHealthcheck = new KafkaHealthcheck(config.brokerId,
config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs,
zkClient)
+ kafkaHealthcheck.startup()
+
/* start kafka controller */
kafkaController = new KafkaController(config, zkClient, brokerState)
kafkaController.startup()
@@ -152,10 +156,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
SystemTime) extends Logg
topicConfigManager = new TopicConfigManager(zkClient, logManager)
topicConfigManager.startup()
- /* tell everyone we are alive */
- kafkaHealthcheck = new KafkaHealthcheck(config.brokerId,
config.advertisedHostName, config.advertisedPort, config.zkSessionTimeoutMs,
zkClient)
- kafkaHealthcheck.startup()
-
/* register broker metrics */
registerStats()
@@ -310,8 +310,6 @@ class KafkaServer(val config: KafkaConfig, time: Time =
SystemTime) extends Logg
if (canShutdown) {
Utils.swallow(controlledShutdown())
brokerState.newState(BrokerShuttingDown)
- if(kafkaHealthcheck != null)
- Utils.swallow(kafkaHealthcheck.shutdown())
if(socketServer != null)
Utils.swallow(socketServer.shutdown())
if(requestHandlerPool != null)
@@ -329,6 +327,8 @@ class KafkaServer(val config: KafkaConfig, time: Time =
SystemTime) extends Logg
Utils.swallow(consumerCoordinator.shutdown())
if(kafkaController != null)
Utils.swallow(kafkaController.shutdown())
+ if(kafkaHealthcheck != null)
+ Utils.swallow(kafkaHealthcheck.shutdown())
if(zkClient != null)
Utils.swallow(zkClient.close())
{code}
> controller error message on shutting the last broker
> ----------------------------------------------------
>
> Key: KAFKA-1887
> URL: https://issues.apache.org/jira/browse/KAFKA-1887
> Project: Kafka
> Issue Type: Bug
> Components: core
> Reporter: Jun Rao
> Assignee: Sriharsha Chintalapani
> Priority: Minor
> Fix For: 0.8.3
>
>
> We always see the following error in state-change log on shutting down the
> last broker.
> [2015-01-20 13:21:04,036] ERROR Controller 0 epoch 3 initiated state change
> for partition [test,0] from OfflinePartition to OnlinePartition failed
> (state.change.logger)
> kafka.common.NoReplicaOnlineException: No replica for partition [test,0] is
> alive. Live brokers are: [Set()], Assigned replicas are: [List(0)]
> at
> kafka.controller.OfflinePartitionLeaderSelector.selectLeader(PartitionLeaderSelector.scala:75)
> at
> kafka.controller.PartitionStateMachine.electLeaderForPartition(PartitionStateMachine.scala:357)
> at
> kafka.controller.PartitionStateMachine.kafka$controller$PartitionStateMachine$$handleStateChange(PartitionStateMachine.scala:206)
> at
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:120)
> at
> kafka.controller.PartitionStateMachine$$anonfun$triggerOnlinePartitionStateChange$3.apply(PartitionStateMachine.scala:117)
> at
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> at
> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> at
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> at
> kafka.controller.PartitionStateMachine.triggerOnlinePartitionStateChange(PartitionStateMachine.scala:117)
> at
> kafka.controller.KafkaController.onBrokerFailure(KafkaController.scala:446)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ReplicaStateMachine.scala:373)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1$$anonfun$apply$mcV$sp$1.apply(ReplicaStateMachine.scala:359)
> at kafka.metrics.KafkaTimer.time(KafkaTimer.scala:33)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(ReplicaStateMachine.scala:358)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener$$anonfun$handleChildChange$1.apply(ReplicaStateMachine.scala:357)
> at kafka.utils.Utils$.inLock(Utils.scala:535)
> at
> kafka.controller.ReplicaStateMachine$BrokerChangeListener.handleChildChange(ReplicaStateMachine.scala:356)
> at org.I0Itec.zkclient.ZkClient$7.run(ZkClient.java:568)
> at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)