KAFKA-999 Controlled shutdown never succeeds until the broker is killed; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9753d15e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9753d15e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9753d15e Branch: refs/heads/trunk Commit: 9753d15ef1da0ecab9bbee51d8cdc08c2159c50b Parents: 1d6ad3d Author: Swapnil Ghike <[email protected]> Authored: Tue Aug 6 20:41:59 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Tue Aug 6 20:42:08 2013 -0700 ---------------------------------------------------------------------- .../scala/kafka/api/LeaderAndIsrRequest.scala | 14 ++++++------- .../main/scala/kafka/cluster/Partition.scala | 21 ++++++++++---------- .../controller/ControllerChannelManager.scala | 7 +++---- .../kafka/controller/KafkaController.scala | 4 ++-- .../controller/PartitionStateMachine.scala | 4 ++-- .../kafka/controller/ReplicaStateMachine.scala | 2 +- .../scala/kafka/server/ReplicaManager.scala | 6 +++--- 7 files changed, 29 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala index a474474..981d2bb 100644 --- a/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala +++ b/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala @@ -135,13 +135,13 @@ case class LeaderAndIsrRequest (versionId: Short, controllerId: Int, controllerEpoch: Int, partitionStateInfos: Map[(String, Int), PartitionStateInfo], - aliveLeaders: Set[Broker]) + leaders: Set[Broker]) extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey), correlationId) { - def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], aliveLeaders: Set[Broker], controllerId: Int, + def this(partitionStateInfos: Map[(String, Int), PartitionStateInfo], leaders: Set[Broker], controllerId: Int, controllerEpoch: Int, correlationId: Int, clientId: String) = { this(LeaderAndIsrRequest.CurrentVersion, correlationId, clientId, - controllerId, controllerEpoch, partitionStateInfos, aliveLeaders) + controllerId, controllerEpoch, partitionStateInfos, leaders) } def writeTo(buffer: ByteBuffer) { @@ -156,8 +156,8 @@ case class LeaderAndIsrRequest (versionId: Short, buffer.putInt(key._2) value.writeTo(buffer) } - buffer.putInt(aliveLeaders.size) - aliveLeaders.foreach(_.writeTo(buffer)) + buffer.putInt(leaders.size) + leaders.foreach(_.writeTo(buffer)) } def sizeInBytes(): Int = { @@ -171,7 +171,7 @@ case class LeaderAndIsrRequest (versionId: Short, for((key, value) <- partitionStateInfos) size += (2 + key._1.length) /* topic */ + 4 /* partition */ + value.sizeInBytes /* partition state info */ size += 4 /* number of leader brokers */ - for(broker <- aliveLeaders) + for(broker <- leaders) size += broker.sizeInBytes /* broker info */ size } @@ -185,7 +185,7 @@ case class LeaderAndIsrRequest (versionId: Short, leaderAndIsrRequest.append(";CorrelationId:" + correlationId) leaderAndIsrRequest.append(";ClientId:" + clientId) leaderAndIsrRequest.append(";PartitionState:" + partitionStateInfos.mkString(",")) - leaderAndIsrRequest.append(";Leaders:" + aliveLeaders.mkString(",")) + leaderAndIsrRequest.append(";Leaders:" + leaders.mkString(",")) leaderAndIsrRequest.toString() } http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 88fc8dd..a9bb3c8 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -167,7 +167,7 @@ class Partition(val topic: String, * 4. start a fetcher to the new leader */ def makeFollower(controllerId: Int, topic: String, partitionId: Int, leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch, - aliveLeaders: Set[Broker], correlationId: Int): Boolean = { + leaders: Set[Broker], correlationId: Int): Boolean = { leaderIsrUpdateLock synchronized { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr if (leaderEpoch >= leaderAndIsr.leaderEpoch) { @@ -186,7 +186,8 @@ class Partition(val topic: String, // on the leader val localReplica = getOrCreateReplica() val newLeaderBrokerId: Int = leaderAndIsr.leader - aliveLeaders.find(_.id == newLeaderBrokerId) match { + // TODO: Delete leaders from LeaderAndIsrRequest in 0.8.1 + leaders.find(_.id == newLeaderBrokerId) match { case Some(leaderBroker) => // stop fetcher thread to previous leader replicaFetcherManager.removeFetcher(topic, partitionId) @@ -200,15 +201,15 @@ class Partition(val topic: String, replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) } else { - stateChangeLogger.trace("Broker %d ignored the become-follower state change with correlation id %d from " + - " controller %d epoch %d since it is shutting down" - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch)) + stateChangeLogger.trace(("Broker %d ignored the become-follower state change with correlation id %d from " + + "controller %d epoch %d since it is shutting down") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch)) } - case None => // leader went down - stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " + - " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation" - .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, - newLeaderBrokerId, topic, partitionId)) + case None => // we should not come here + stateChangeLogger.error(("Broker %d aborted the become-follower state change with correlation id %d from " + + "controller %d epoch %d for partition [%s,%d] new leader %d") + .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, + topic, partitionId, newLeaderBrokerId)) } true } http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 38b8674..ed1ce0b 100644 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -229,14 +229,13 @@ class ControllerBrokerRequestBatch(controllerContext: ControllerContext, sendReq } } - def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int, liveBrokers: Set[Broker]) { + def sendRequestsToBrokers(controllerEpoch: Int, correlationId: Int) { leaderAndIsrRequestMap.foreach { m => val broker = m._1 val partitionStateInfos = m._2.toMap val leaderIds = partitionStateInfos.map(_._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader).toSet - val aliveLeaders = liveBrokers.filter(b => leaderIds.contains(b.id)) - val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, aliveLeaders, controllerId, controllerEpoch, correlationId, - clientId) + val leaders = controllerContext.liveOrShuttingDownBrokers.filter(b => leaderIds.contains(b.id)) + val leaderAndIsrRequest = new LeaderAndIsrRequest(partitionStateInfos, leaders, controllerId, controllerEpoch, correlationId, clientId) for (p <- partitionStateInfos) { val typeOfRequest = if (broker == p._2.leaderIsrAndControllerEpoch.leaderAndIsr.leader) "become-leader" else "become-follower" stateChangeLogger.trace(("Controller %d epoch %d sending %s LeaderAndIsr request with correlationId %d to broker %d " + http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index b07e27b..c87caab 100644 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -168,7 +168,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg // before which the stop replica request should be completed (in most cases) brokerRequestBatch.newBatch() brokerRequestBatch.addStopReplicaRequestForBrokers(Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) // If the broker is a follower, updates the isr in ZK and notifies the current leader replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, @@ -656,7 +656,7 @@ class KafkaController(val config : KafkaConfig, zkClient: ZkClient) extends Logg private def sendUpdateMetadataRequest(brokers: Seq[Int], partitions: Set[TopicAndPartition] = Set.empty[TopicAndPartition]) { brokerRequestBatch.newBatch() brokerRequestBatch.addUpdateMetadataRequestForBrokers(brokers, partitions) - brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(epoch, controllerContext.correlationId.getAndIncrement) } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/PartitionStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala index 0135d45..a084830 100644 --- a/core/src/main/scala/kafka/controller/PartitionStateMachine.scala +++ b/core/src/main/scala/kafka/controller/PartitionStateMachine.scala @@ -89,7 +89,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { if(partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition)) handleStateChange(topicAndPartition.topic, topicAndPartition.partition, OnlinePartition, controller.offlinePartitionSelector) } - brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) } catch { case e => error("Error while moving some partitions to the online state", e) // TODO: It is not enough to bail out and log an error, it is important to trigger leader election for those partitions @@ -109,7 +109,7 @@ class PartitionStateMachine(controller: KafkaController) extends Logging { partitions.foreach { topicAndPartition => handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector) } - brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) }catch { case e => error("Error while moving some partitions to %s state".format(targetState), e) // TODO: It is not enough to bail out and log an error, it is important to trigger state changes for those partitions http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 0c9d436..9f752f4 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -87,7 +87,7 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { try { brokerRequestBatch.newBatch() replicas.foreach(r => handleStateChange(r.topic, r.partition, r.replica, targetState)) - brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement, controllerContext.liveBrokers) + brokerRequestBatch.sendRequestsToBrokers(controller.epoch, controllerContext.correlationId.getAndIncrement) }catch { case e => error("Error while moving some replicas to %s state".format(targetState), e) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9753d15e/core/src/main/scala/kafka/server/ReplicaManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index d885ba1..f551243 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -220,7 +220,7 @@ class ReplicaManager(val config: KafkaConfig, if(requestedLeaderId == config.brokerId) makeLeader(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.correlationId) else - makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.aliveLeaders, + makeFollower(controllerId, controllerEpoch, topic, partitionId, partitionStateInfo, leaderAndISRRequest.leaders, leaderAndISRRequest.correlationId) } catch { case e => @@ -264,14 +264,14 @@ class ReplicaManager(val config: KafkaConfig, } private def makeFollower(controllerId: Int, epoch: Int, topic: String, partitionId: Int, - partitionStateInfo: PartitionStateInfo, aliveLeaders: Set[Broker], correlationId: Int) { + partitionStateInfo: PartitionStateInfo, leaders: Set[Broker], correlationId: Int) { val leaderIsrAndControllerEpoch = partitionStateInfo.leaderIsrAndControllerEpoch stateChangeLogger.trace(("Broker %d received LeaderAndIsr request correlationId %d from controller %d epoch %d " + "starting the become-follower transition for partition [%s,%d]") .format(localBrokerId, correlationId, controllerId, epoch, topic, partitionId)) val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor) - if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, aliveLeaders, correlationId)) { + if (partition.makeFollower(controllerId, topic, partitionId, leaderIsrAndControllerEpoch, leaders, correlationId)) { // remove this replica's partition from the ISR expiration queue leaderPartitionsLock synchronized { leaderPartitions -= partition
