Repository: kafka
Updated Branches:
  refs/heads/trunk 9e72c12e9 -> 56e5627da


KAFKA-4445; PreferredLeaderElectionCommand should query zookeeper only once per 
topic

Author: Dong Lin <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Jiangjie Qin <[email protected]>

Closes #2170 from lindong28/KAFAK-4445


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56e5627d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56e5627d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56e5627d

Branch: refs/heads/trunk
Commit: 56e5627da5bfa01d1fa95e760a6f45949f89996a
Parents: 9e72c12
Author: Dong Lin <[email protected]>
Authored: Wed Dec 7 10:08:46 2016 -0800
Committer: Jiangjie Qin <[email protected]>
Committed: Wed Dec 7 10:08:46 2016 -0800

----------------------------------------------------------------------
 .../PreferredReplicaLeaderElectionCommand.scala | 34 +++++++-------------
 1 file changed, 11 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/56e5627d/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala 
b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
index 81014b1..960d526 100755
--- 
a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
+++ 
b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala
@@ -67,7 +67,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging {
       val preferredReplicaElectionCommand = new 
PreferredReplicaLeaderElectionCommand(zkUtils, 
partitionsForPreferredReplicaElection)
 
       preferredReplicaElectionCommand.moveLeaderToPreferredReplica()
-      println("Successfully started preferred replica election for partitions 
%s".format(partitionsForPreferredReplicaElection))
     } catch {
       case e: Throwable =>
         println("Failed to start preferred replica election")
@@ -107,7 +106,7 @@ object PreferredReplicaLeaderElectionCommand extends 
Logging {
     val jsonData = Json.encode(Map("version" -> 1, "partitions" -> 
partitionsList))
     try {
       zkUtils.createPersistentPath(zkPath, jsonData)
-      info("Created preferred replica election path with %s".format(jsonData))
+      println("Created preferred replica election path with 
%s".format(jsonData))
     } catch {
       case _: ZkNodeExistsException =>
         val partitionsUndergoingPreferredReplicaElection =
@@ -119,32 +118,21 @@ object PreferredReplicaLeaderElectionCommand extends 
Logging {
   }
 }
 
-class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitions: 
scala.collection.Set[TopicAndPartition])
-  extends Logging {
+class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, 
partitionsFromUser: scala.collection.Set[TopicAndPartition]) {
   def moveLeaderToPreferredReplica() = {
     try {
-      val validPartitions = partitions.filter(p => validatePartition(zkUtils, 
p.topic, p.partition))
+      val topics = partitionsFromUser.map(_.topic).toSet
+      val partitionsFromZk = 
zkUtils.getPartitionsForTopics(topics.toSeq).flatMap{ case (topic, partitions) 
=>
+        partitions.map(TopicAndPartition(topic, _))
+      }.toSet
+
+      val (validPartitions, invalidPartitions) = 
partitionsFromUser.partition(partitionsFromZk.contains)
       
PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils,
 validPartitions)
+
+      println("Successfully started preferred replica election for partitions 
%s".format(validPartitions))
+      invalidPartitions.foreach(p => println("Skipping preferred replica 
leader election for partition %s since it doesn't exist.".format(p)))
     } catch {
       case e: Throwable => throw new AdminCommandFailedException("Admin 
command failed", e)
     }
   }
-
-  def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): 
Boolean = {
-    // check if partition exists
-    val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic)
-    partitionsOpt match {
-      case Some(partitions) =>
-        if(partitions.contains(partition)) {
-          true
-        } else {
-          error("Skipping preferred replica leader election for partition 
[%s,%d] ".format(topic, partition) +
-            "since it doesn't exist")
-          false
-        }
-      case None => error("Skipping preferred replica leader election for 
partition " +
-        "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
-        false
-    }
-  }
 }

Reply via email to