[ https://issues.apache.org/jira/browse/KAFKA-3173?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15186928#comment-15186928 ]
Flavio Junqueira commented on KAFKA-3173: ----------------------------------------- There is a race in the controller failover that can be causing this. Here is what we have in {{KafkaController.oncControllerFailover}}: {noformat} partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() replicaStateMachine.startup() partitionStateMachine.startup() {noformat} Both partition state machine and replica state machine are registering zk listeners before they startup. In partition state machine, the two calls that invoke {{ControllerBrokerRequestBatch.newBatch()}} are {{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}. They both invoke a private method that check {{hasStarted}} and throw an exception if it hasn't started. The variable {{hasStarted}} is set to true upon executing {{startup}}. Consequently, if we register the listener before starting the state machine and an event comes through, then what will happen is that we will leave the batch dirty, which will cause the exception in the description. In both {{triggerOnlinePartitionStateChange}} and {{handleStateChanges}}, we simply log the error and move on leaving the batch dirty. Note that we have the same issue in replica state machine. I believe the right execution order in {{onControllerFailover}} should be: {noformat} partitionStateMachine.startup() replicaStateMachine.startup() partitionStateMachine.registerListeners() replicaStateMachine.registerListeners() initializeControllerContext() {noformat} There could be other sources, but right now this looks like a clear one to me. Also, something that looks really bad in lots of place in the controller code is that if there is some error processing partition or replica changes, then the code simply logs and moves on. Instead, I believe it should either recover from the error or resigning as controller, but we can't simply skip an update. This should be addressed when rewriting the controller, though. As for this jira, I suggest we fix this race and revisit it in the case it reappears. > Error while moving some partitions to OnlinePartition state > ------------------------------------------------------------ > > Key: KAFKA-3173 > URL: https://issues.apache.org/jira/browse/KAFKA-3173 > Project: Kafka > Issue Type: Bug > Affects Versions: 0.9.0.0 > Reporter: Flavio Junqueira > Assignee: Flavio Junqueira > Priority: Critical > Fix For: 0.10.0.0 > > > We observed another instance of the problem reported in KAFKA-2300, but this > time the error appeared in the partition state machine. In KAFKA-2300, we > haven't cleaned up the state in {{PartitionStateMachine}} and > {{ReplicaStateMachine}} as we do in {{KafkaController}}. > Here is the stack trace: > {noformat} > 2016-01-29 15:26:51,393] ERROR [Partition state machine on Controller 0]: > Error while moving some partitions to OnlinePartition state > (kafka.controller.PartitionStateMachine)java.lang.IllegalStateException: > Controller to broker state change requests batch is not empty while creating > a new one. > Some LeaderAndIsr state changes Map(0 -> Map(foo-0 -> (LeaderAndIsrInfo: > (Leader:0,ISR:0,LeaderEpoch:0,ControllerEpoch:1),ReplicationFactor:1),AllReplicas:0))) > might be lost at > kafka.controller.ControllerBrokerRequestBatch.newBatch(ControllerChannelManager.scala:254) > at > kafka.controller.PartitionStateMachine.handleStateChanges(PartitionStateMachine.scala:144) > at > kafka.controller.KafkaController.onNewPartitionCreation(KafkaController.scala:517) > at > kafka.controller.KafkaController.onNewTopicCreation(KafkaController.scala:504) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply$mcV$sp(PartitionStateMachine.scala:437) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.controller.PartitionStateMachine$TopicChangeListener$$anonfun$handleChildChange$1.apply(PartitionStateMachine.scala:419) > at > kafka.utils.CoreUtils$.inLock(CoreUtils.scala:262) at > kafka.controller.PartitionStateMachine$TopicChangeListener.handleChildChange(PartitionStateMachine.scala:418) > at > org.I0Itec.zkclient.ZkClient$10.run(ZkClient.java:842) at > org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) > {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)