[ https://issues.apache.org/jira/browse/KAFKA-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ismael Juma updated KAFKA-3964: ------------------------------- Labels: reliability (was: ) > Metadata update requests are sometimes received after LeaderAndIsrRequests > -------------------------------------------------------------------------- > > Key: KAFKA-3964 > URL: https://issues.apache.org/jira/browse/KAFKA-3964 > Project: Kafka > Issue Type: Bug > Components: controller > Reporter: Maysam Yabandeh > Priority: Minor > Labels: reliability > > The broker needs metadata of the leader before being able to process > LeaderAndIsrRequest from the controller. For this reason on broker startup > the controller first sends the metadata update requests and AFTER that it > sends the LeaderAndIsrRequests: > {code} > def onBrokerStartup(newBrokers: Seq[Int]) { > info("New broker startup callback for > %s".format(newBrokers.mkString(","))) > val newBrokersSet = newBrokers.toSet > // send update metadata request to all live and shutting down brokers. > Old brokers will get to know of the new > // broker via this update. > // In cases of controlled shutdown leaders will not be elected when a new > broker comes up. So at least in the > // common controlled shutdown case, the metadata will reach the new > brokers faster > > sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) > // the very first thing to do when a new broker comes up is send it the > entire list of partitions that it is > // supposed to host. Based on that the broker starts the high watermark > threads for the input list of partitions > val allReplicasOnNewBrokers = > controllerContext.replicasOnBrokers(newBrokersSet) > replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, > OnlineReplica) > {code} > However this protocol is not followed when a nodes becomes the controller: it > sends LeaderAndIsrRequests BEFORE sending the metadata update requests: > {code} > def onControllerFailover() { > ... > replicaStateMachine.startup() > ... > /* send partition leadership info to all live brokers */ > sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) > {code} > ReplicaStateMachine::startup > {code} > def startup() { > ... > // move all Online replicas to Online > handleStateChanges(controllerContext.allLiveReplicas(), > OnlineReplica){code} > which trigger LeaderAndIsrRequest messages. > Here is the symptoms that one would observe when this problem manifests: > # The first set of messages that the broker receives from the controller is > LeaderAndIsrRequests > # The broker fails to become the follower as requested by the controller > {code} > 2016-07-12 21:03:53,081 ERROR change.logger: Broker 14 received > LeaderAndIsrRequest with correlation id 0 from controller 21 epoch 290 for > partition [topicxyz,7] but cannot become follower since the new leader 22 is > unavailable. > {code} > # The fetcher hence does not start and the partition remains under-replicated. -- This message was sent by Atlassian JIRA (v6.3.15#6346)