KAFKA-831 Controller does not send the complete list of partitions to a newly 
started broker; reviewed by Jun Rao and Swapnil Ghike


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9f6af315
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9f6af315
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9f6af315

Branch: refs/heads/trunk
Commit: 9f6af315ca101272748de54fd1347c9def7d80af
Parents: 6e05d7d
Author: Neha Narkhede <[email protected]>
Authored: Fri Mar 29 08:24:25 2013 -0700
Committer: Neha Narkhede <[email protected]>
Committed: Fri Mar 29 08:26:41 2013 -0700

----------------------------------------------------------------------
 .../main/scala/kafka/cluster/Partition.scala    |  2 +-
 .../kafka/controller/ReplicaStateMachine.scala  | 23 +++++++++-----------
 2 files changed, 11 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index 6e73003..2ca7ee6 100644
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -195,7 +195,7 @@ class Partition(val topic: String,
           replicaFetcherManager.addFetcher(topic, partitionId, 
localReplica.logEndOffset, leaderBroker)
         case None => // leader went down
           stateChangeLogger.trace("Broker %d aborted the become-follower state 
change with correlation id %d from " +
-            " controller %d epoch %d since leader %d for partition [%s,%d] 
became unavailable during the state change operation"
+            " controller %d epoch %d since leader %d for partition [%s,%d] is 
unavailable during the state change operation"
                                      .format(localBrokerId, correlationId, 
controllerId, leaderIsrAndControllerEpoch.controllerEpoch,
                                               newLeaderBrokerId, topic, 
partitionId))
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9f6af315/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala 
b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
index 199640b..3cf1da3 100644
--- a/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
+++ b/core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
@@ -143,21 +143,18 @@ class ReplicaStateMachine(controller: KafkaController) 
extends Logging {
               stateChangeLogger.trace("Controller %d epoch %d changed state of 
replica %d for partition %s to OnlineReplica"
                                         .format(controllerId, 
controller.epoch, replicaId, topicAndPartition))
             case _ =>
-              // check if the leader for this partition is alive or even exists
-                
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
+              // check if the leader for this partition ever existed
+              controllerContext.partitionLeadershipInfo.get(topicAndPartition) 
match {
                 case Some(leaderIsrAndControllerEpoch) =>
-                  
controllerContext.liveBrokerIds.contains(leaderIsrAndControllerEpoch.leaderAndIsr.leader)
 match {
-                    case true => // leader is alive
-                      
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId),
-                                                                          
topic, partition, leaderIsrAndControllerEpoch,
-                                                                          
replicaAssignment.size)
-                      replicaState.put((topic, partition, replicaId), 
OnlineReplica)
-                      stateChangeLogger.trace("Controller %d epoch %d changed 
state of replica %d for partition %s to OnlineReplica"
-                                                .format(controllerId, 
controller.epoch, replicaId, topicAndPartition))
-                    case false => // ignore partitions whose leader is not 
alive
-                  }
-                case None => // ignore partitions who don't have a leader yet
+                  
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(List(replicaId), topic, 
partition, leaderIsrAndControllerEpoch,
+                    replicaAssignment.size)
+                  replicaState.put((topic, partition, replicaId), 
OnlineReplica)
+                  stateChangeLogger.trace("Controller %d epoch %d changed 
state of replica %d for partition %s to OnlineReplica"
+                    .format(controllerId, controller.epoch, replicaId, 
topicAndPartition))
+                case None => // that means the partition was never in 
OnlinePartition state, this means the broker never
+                  // started a log for that partition and does not have a high 
watermark value for this partition
               }
+
           }
           replicaState.put((topic, partition, replicaId), OnlineReplica)
         case OfflineReplica =>

Reply via email to