KAFKA-831 Controller does not send the complete list of partitions to a newly 
started broker; reviewed by Jun Rao


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1d5e95f6
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1d5e95f6
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1d5e95f6

Branch: refs/heads/trunk
Commit: 1d5e95f6c4067884a374deddd88ec3f471664658
Parents: 66b1038
Author: Neha Narkhede <neha.narkh...@gmail.com>
Authored: Thu Mar 28 16:29:37 2013 -0700
Committer: Neha Narkhede <neha.narkh...@gmail.com>
Committed: Thu Mar 28 16:29:37 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/controller/KafkaController.scala | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1d5e95f6/core/src/main/scala/kafka/controller/KafkaController.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala 
b/core/src/main/scala/kafka/controller/KafkaController.scala
index 9d32901..47d4d7b 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -273,9 +273,12 @@ class KafkaController(val config : KafkaConfig, zkClient: 
ZkClient) extends Logg
     info("New broker startup callback for %s".format(newBrokers.mkString(",")))
 
     val newBrokersSet = newBrokers.toSet
-    // update partition state machine
-    partitionStateMachine.triggerOnlinePartitionStateChange()
+    // the very first thing to do when a new broker comes up is send it the 
entire list of partitions that it is
+    // supposed to host. Based on that the broker starts the high watermark 
threads for the input list of partitions
     replicaStateMachine.handleStateChanges(getAllReplicasOnBroker(zkClient, 
controllerContext.allTopics.toSeq, newBrokers), OnlineReplica)
+    // when a new broker comes up, the controller needs to trigger leader 
election for all new and offline partitions
+    // to see if these brokers can become leaders for some/all of those
+    partitionStateMachine.triggerOnlinePartitionStateChange()
 
     // check if reassignment of some partitions need to be restarted
     val partitionsWithReplicasOnNewBrokers = 
controllerContext.partitionsBeingReassigned.filter{

Reply via email to