KAFKA-831 Controller does not send the complete list of partitions to a newly started broker; reviewed by Jun Rao and Swapnil Ghike
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f6af315 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f6af315 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f6af315 Branch: refs/heads/trunk Commit: 9f6af315ca101272748de54fd1347c9def7d80af Parents: 6e05d7d Author: Neha Narkhede <[email protected]> Authored: Fri Mar 29 08:24:25 2013 -0700 Committer: Neha Narkhede <[email protected]> Committed: Fri Mar 29 08:26:41 2013 -0700 ---------------------------------------------------------------------- .../main/scala/kafka/cluster/Partition.scala | 2 +- .../kafka/controller/ReplicaStateMachine.scala | 23 +++++++++----------- 2 files changed, 11 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/cluster/Partition.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala index 6e73003..2ca7ee6 100644 --- a/core/src/main/scala/kafka/cluster/Partition.scala +++ b/core/src/main/scala/kafka/cluster/Partition.scala @@ -195,7 +195,7 @@ class Partition(val topic: String, replicaFetcherManager.addFetcher(topic, partitionId, localReplica.logEndOffset, leaderBroker) case None => // leader went down stateChangeLogger.trace("Broker %d aborted the become-follower state change with correlation id %d from " + - " controller %d epoch %d since leader %d for partition [%s,%d] became unavailable during the state change operation" + " controller %d epoch %d since leader %d for partition [%s,%d] is unavailable during the state change operation" .format(localBrokerId, correlationId, controllerId, leaderIsrAndControllerEpoch.controllerEpoch, newLeaderBrokerId, topic, partitionId)) } http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala index 199640b..3cf1da3 100644 --- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala +++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala @@ -143,21 +143,18 @@ class ReplicaStateMachine(controller: KafkaController) extends Logging { stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" .format(controllerId, controller.epoch, replicaId, topicAndPartition)) case _ => - // check if the leader for this partition is alive or even exists - controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { + // check if the leader for this partition ever existed + controllerContext.partitionLeadershipInfo.get(topicAndPartition) match { case Some(leaderIsrAndControllerEpoch) => - controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader) match { - case true => // leader is alive - brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), - topic, partition, leaderIsrAndControllerEpoch, - replicaAssignment.size) - replicaState.put((topic, partition, replicaId), OnlineReplica) - stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" - .format(controllerId, controller.epoch, replicaId, topicAndPartition)) - case false => // ignore partitions whose leader is not alive - } - case None => // ignore partitions who don't have a leader yet + brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, partition, leaderIsrAndControllerEpoch, + replicaAssignment.size) + replicaState.put((topic, partition, replicaId), OnlineReplica) + stateChangeLogger.trace("Controller %d epoch %d changed state of replica %d for partition %s to OnlineReplica" + .format(controllerId, controller.epoch, replicaId, topicAndPartition)) + case None => // that means the partition was never in OnlinePartition state, this means the broker never + // started a log for that partition and does not have a high watermark value for this partition } + } replicaState.put((topic, partition, replicaId), OnlineReplica) case OfflineReplica =>
