This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0d840d  KAFKA-5928; Avoid redundant requests to zookeeper when 
reassign topic partition
b0d840d is described below

commit b0d840d34b4172add831367e8fc2c51e75efb549
Author: uncleGen <[email protected]>
AuthorDate: Mon Aug 6 20:04:55 2018 -0700

    KAFKA-5928; Avoid redundant requests to zookeeper when reassign topic 
partition
    
    Author: uncleGen <[email protected]>
    
    Reviewers: Ismael Juma <[email protected]>, Dong Lin <[email protected]>
    
    Closes #3894 from uncleGen/KAFKA-5928
---
 .../kafka/admin/ReassignPartitionsCommand.scala    | 74 +++++++++++-----------
 .../scala/unit/kafka/admin/DeleteTopicTest.scala   |  5 +-
 .../admin/ReassignPartitionsCommandTest.scala      | 15 ++---
 3 files changed, 44 insertions(+), 50 deletions(-)

diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4d9da90..041375a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -347,13 +347,24 @@ object ReassignPartitionsCommand extends Logging {
     (partitionsToBeReassigned, replicaAssignment)
   }
 
-  private def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, 
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
+  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, 
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
   :Map[TopicPartition, ReassignmentStatus] = {
     val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
-    partitionsToBeReassigned.keys.map { topicAndPartition =>
-      (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition, partitionsToBeReassigned,
-        partitionsBeingReassigned))
+    val (beingReassigned, notBeingReassigned) = 
partitionsToBeReassigned.keys.partition { topicAndPartition =>
+      partitionsBeingReassigned.contains(topicAndPartition)
+    }
+    notBeingReassigned.groupBy(_.topic).flatMap { case (topic, partitions) =>
+      val replicasForTopic = 
zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
+      partitions.map { topicAndPartition =>
+        val newReplicas = partitionsToBeReassigned(topicAndPartition)
+        val reassignmentStatus = replicasForTopic.get(topicAndPartition) match 
{
+          case Some(seq) if seq == newReplicas => ReassignmentCompleted
+          case _ => ReassignmentFailed
+        }
+        (topicAndPartition, reassignmentStatus)
+      }
+    } ++ beingReassigned.map { topicAndPartition =>
+      (topicAndPartition, ReassignmentInProgress)
     }.toMap
   }
 
@@ -398,25 +409,6 @@ object ReassignPartitionsCommand extends Logging {
     }
   }
 
-  def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient, 
topicAndPartition: TopicPartition,
-                                            partitionsToBeReassigned: 
Map[TopicPartition, Seq[Int]],
-                                            partitionsBeingReassigned: 
Map[TopicPartition, Seq[Int]]): ReassignmentStatus = {
-    val newReplicas = partitionsToBeReassigned(topicAndPartition)
-    partitionsBeingReassigned.get(topicAndPartition) match {
-      case Some(_) => ReassignmentInProgress
-      case None =>
-        // check if the current replica assignment matches the expected one 
after reassignment
-        val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topicAndPartition.topic, topicAndPartition.partition))
-        if(assignedReplicas == newReplicas)
-          ReassignmentCompleted
-        else {
-          println(("ERROR: Assigned replicas (%s) don't match the list of 
replicas for reassignment (%s)" +
-            " for partition %s").format(assignedReplicas.mkString(","), 
newReplicas.mkString(","), topicAndPartition))
-          ReassignmentFailed
-        }
-    }
-  }
-
   def validateAndParseArgs(args: Array[String]): 
ReassignPartitionsCommandOptions = {
     val opts = new ReassignPartitionsCommandOptions(args)
 
@@ -559,7 +551,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
   private[admin] def assignThrottledReplicas(existingPartitionAssignment: 
Map[TopicPartition, Seq[Int]],
                                              proposedPartitionAssignment: 
Map[TopicPartition, Seq[Int]],
                                              adminZkClient: AdminZkClient): 
Unit = {
-    for (topic <- proposedPartitionAssignment.keySet.map(_.topic).toSeq) {
+    for (topic <- 
proposedPartitionAssignment.keySet.map(_.topic).toSeq.distinct) {
       val existingPartitionAssignmentForTopic = 
existingPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
       val proposedPartitionAssignmentForTopic = 
proposedPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
 
@@ -621,7 +613,10 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
   def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 
10000L): Boolean = {
     maybeThrottle(throttle)
     try {
-      val validPartitions = proposedPartitionAssignment.filter { case (p, _) 
=> validatePartition(zkClient, p.topic, p.partition) }
+      val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
+        .flatMap { case (topic, topicPartitionReplicas) =>
+          validatePartition(zkClient, topic, topicPartitionReplicas)
+        }
       if (validPartitions.isEmpty) false
       else {
         if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
@@ -655,21 +650,24 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
     }
   }
 
-  def validatePartition(zkClient: KafkaZkClient, topic: String, partition: 
Int): Boolean = {
+  def validatePartition(zkClient: KafkaZkClient, topic: String, 
topicPartitionReplicas: Map[TopicPartition, Seq[Int]])
+  :Map[TopicPartition, Seq[Int]] = {
     // check if partition exists
     val partitionsOpt = 
zkClient.getPartitionsForTopics(immutable.Set(topic)).get(topic)
-    partitionsOpt match {
-      case Some(partitions) =>
-        if(partitions.contains(partition)) {
-          true
-        } else {
-          error("Skipping reassignment of partition [%s,%d] ".format(topic, 
partition) +
-            "since it doesn't exist")
+    topicPartitionReplicas.filter { case (topicPartition, _) =>
+      partitionsOpt match {
+        case Some(partitions) =>
+          if (partitions.contains(topicPartition.partition())) {
+            true
+          } else {
+            error("Skipping reassignment of partition [%s,%d] ".format(topic, 
topicPartition.partition()) +
+              "since it doesn't exist")
+            false
+          }
+        case None => error("Skipping reassignment of partition " +
+          "[%s,%d] since topic %s doesn't exist".format(topic, 
topicPartition.partition(), topic))
           false
-        }
-      case None => error("Skipping reassignment of partition " +
-        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
-        false
+      }
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala 
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4d089b3..12fb479 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -132,9 +132,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
     assertTrue("Partition reassignment should fail for [test,0]", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-      val partitionsBeingReassigned = zkClient.getPartitionReassignment
-      
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicPartition,
-        Map(topicPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentFailed
+      
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicPartition -> newReplicas))
+        .getOrElse(topicPartition, fail(s"Failed to get reassignment status 
for $topicPartition")) == ReassignmentFailed
     }, "Partition reassignment shouldn't complete.")
     val controllerId = zkClient.getControllerId.getOrElse(fail("Controller 
doesn't exist"))
     val controller = servers.filter(s => s.config.brokerId == 
controllerId).head
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 6978f8d..213c23a 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -439,9 +439,8 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging {
     assertTrue("Partition reassignment attempt failed for [test, 0]", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition,
-        Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted
+        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicAndPartition -> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment 
status for $topicAndPartition")) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topic, partitionToBeReassigned))
@@ -469,9 +468,8 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging {
     assertTrue("Partition reassignment failed for test, 0", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted
+        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicAndPartition -> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment 
status for $topicAndPartition")) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topic, partitionToBeReassigned))
@@ -498,9 +496,8 @@ class ReassignPartitionsCommandTest extends 
ZooKeeperTestHarness with Logging {
     assertTrue("Partition reassignment failed for test, 0", 
reassignPartitionsCommand.reassignPartitions())
     // wait until reassignment is completed
     TestUtils.waitUntilTrue(() => {
-        val partitionsBeingReassigned = zkClient.getPartitionReassignment
-        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
topicAndPartition,
-          Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) == 
ReassignmentCompleted
+        
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient, 
Map(topicAndPartition -> newReplicas))
+          .getOrElse(topicAndPartition, fail(s"Failed to get reassignment 
status for $topicAndPartition")) == ReassignmentCompleted
       },
       "Partition reassignment should complete")
     val assignedReplicas = zkClient.getReplicasForPartition(new 
TopicPartition(topic, partitionToBeReassigned))

Reply via email to