Repository: kafka
Updated Branches:
  refs/heads/trunk f2d4ed5bc -> 4922a51ed


KAFKA-2146: Fix adding partition did not find the correct startIndex.

TopicCommand provide a tool to add partitions for existing topics. It try to 
find the startIndex from existing partitions. There's a minor flaw in this 
process, it try to use the first partition fetched from zookeeper as the start 
partition, and use the first replica id in this partition as the startIndex.
One thing, the first partition fetched from zookeeper is not necessary to be 
the start partition. As partition id begin from zero, we should use partition 
with id zero as the start partition.
The other, broker id does not necessary begin from 0, so the startIndex is not 
necessary to be the first replica id in the start partition.

Author: chenshangan <[email protected]>

Reviewers: Guozhang Wang

Closes #329 from shangan/trunk-KAFKA-2146


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4922a51e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4922a51e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4922a51e

Branch: refs/heads/trunk
Commit: 4922a51edd91d862d9be92b1a652d2a83d936b2c
Parents: f2d4ed5
Author: Chen Shangan <[email protected]>
Authored: Thu Jan 21 16:18:13 2016 -0800
Committer: Guozhang Wang <[email protected]>
Committed: Thu Jan 21 16:18:13 2016 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/admin/AdminUtils.scala | 18 +++++++++++++-----
 1 file changed, 13 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4922a51e/core/src/main/scala/kafka/admin/AdminUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminUtils.scala 
b/core/src/main/scala/kafka/admin/AdminUtils.scala
index a8b3364..a868400 100644
--- a/core/src/main/scala/kafka/admin/AdminUtils.scala
+++ b/core/src/main/scala/kafka/admin/AdminUtils.scala
@@ -113,23 +113,31 @@ object AdminUtils extends Logging {
     if (existingPartitionsReplicaList.size == 0)
       throw new AdminOperationException("The topic %s does not 
exist".format(topic))
 
-    val existingReplicaList = existingPartitionsReplicaList.head._2
+    val existingReplicaListForPartitionZero = 
existingPartitionsReplicaList.find(p => p._1.partition == 0) match {
+      case None => throw new AdminOperationException("the topic does not have 
partition with id 0, it should never happen")
+      case Some(headPartitionReplica) => headPartitionReplica._2
+    }
     val partitionsToAdd = numPartitions - existingPartitionsReplicaList.size
     if (partitionsToAdd <= 0)
       throw new AdminOperationException("The number of partitions for a topic 
can only be increased")
 
     // create the new partition replication list
     val brokerList = zkUtils.getSortedBrokerList()
-    val newPartitionReplicaList = if (replicaAssignmentStr == null || 
replicaAssignmentStr == "")
-      AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, 
existingReplicaList.size, existingReplicaList.head, 
existingPartitionsReplicaList.size)
+    val newPartitionReplicaList = if (replicaAssignmentStr == null || 
replicaAssignmentStr == "") {
+      var startIndex = brokerList.indexWhere(_ >= 
existingReplicaListForPartitionZero.head)
+      if(startIndex < 0) {
+        startIndex = 0
+      }
+      AdminUtils.assignReplicasToBrokers(brokerList, partitionsToAdd, 
existingReplicaListForPartitionZero.size, startIndex, 
existingPartitionsReplicaList.size)
+    }
     else
       getManualReplicaAssignment(replicaAssignmentStr, brokerList.toSet, 
existingPartitionsReplicaList.size, checkBrokerAvailable)
 
     // check if manual assignment has the right replication factor
-    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => 
(p.size != existingReplicaList.size))
+    val unmatchedRepFactorList = newPartitionReplicaList.values.filter(p => 
(p.size != existingReplicaListForPartitionZero.size))
     if (unmatchedRepFactorList.size != 0)
       throw new AdminOperationException("The replication factor in manual 
replication assignment " +
-        " is not equal to the existing replication factor for the topic " + 
existingReplicaList.size)
+        " is not equal to the existing replication factor for the topic " + 
existingReplicaListForPartitionZero.size)
 
     info("Add partition list for %s is %s".format(topic, 
newPartitionReplicaList))
     val partitionReplicaList = existingPartitionsReplicaList.map(p => 
p._1.partition -> p._2)

Reply via email to