Repository: kafka Updated Branches: refs/heads/trunk 9e72c12e9 -> 56e5627da
KAFKA-4445; PreferredLeaderElectionCommand should query zookeeper only once per topic Author: Dong Lin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jiangjie Qin <[email protected]> Closes #2170 from lindong28/KAFAK-4445 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/56e5627d Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/56e5627d Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/56e5627d Branch: refs/heads/trunk Commit: 56e5627da5bfa01d1fa95e760a6f45949f89996a Parents: 9e72c12 Author: Dong Lin <[email protected]> Authored: Wed Dec 7 10:08:46 2016 -0800 Committer: Jiangjie Qin <[email protected]> Committed: Wed Dec 7 10:08:46 2016 -0800 ---------------------------------------------------------------------- .../PreferredReplicaLeaderElectionCommand.scala | 34 +++++++------------- 1 file changed, 11 insertions(+), 23 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/56e5627d/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala index 81014b1..960d526 100755 --- a/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala +++ b/core/src/main/scala/kafka/admin/PreferredReplicaLeaderElectionCommand.scala @@ -67,7 +67,6 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val preferredReplicaElectionCommand = new PreferredReplicaLeaderElectionCommand(zkUtils, partitionsForPreferredReplicaElection) preferredReplicaElectionCommand.moveLeaderToPreferredReplica() - println("Successfully started preferred replica election for partitions %s".format(partitionsForPreferredReplicaElection)) } catch { case e: Throwable => println("Failed to start preferred replica election") @@ -107,7 +106,7 @@ object PreferredReplicaLeaderElectionCommand extends Logging { val jsonData = Json.encode(Map("version" -> 1, "partitions" -> partitionsList)) try { zkUtils.createPersistentPath(zkPath, jsonData) - info("Created preferred replica election path with %s".format(jsonData)) + println("Created preferred replica election path with %s".format(jsonData)) } catch { case _: ZkNodeExistsException => val partitionsUndergoingPreferredReplicaElection = @@ -119,32 +118,21 @@ object PreferredReplicaLeaderElectionCommand extends Logging { } } -class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitions: scala.collection.Set[TopicAndPartition]) - extends Logging { +class PreferredReplicaLeaderElectionCommand(zkUtils: ZkUtils, partitionsFromUser: scala.collection.Set[TopicAndPartition]) { def moveLeaderToPreferredReplica() = { try { - val validPartitions = partitions.filter(p => validatePartition(zkUtils, p.topic, p.partition)) + val topics = partitionsFromUser.map(_.topic).toSet + val partitionsFromZk = zkUtils.getPartitionsForTopics(topics.toSeq).flatMap{ case (topic, partitions) => + partitions.map(TopicAndPartition(topic, _)) + }.toSet + + val (validPartitions, invalidPartitions) = partitionsFromUser.partition(partitionsFromZk.contains) PreferredReplicaLeaderElectionCommand.writePreferredReplicaElectionData(zkUtils, validPartitions) + + println("Successfully started preferred replica election for partitions %s".format(validPartitions)) + invalidPartitions.foreach(p => println("Skipping preferred replica leader election for partition %s since it doesn't exist.".format(p))) } catch { case e: Throwable => throw new AdminCommandFailedException("Admin command failed", e) } } - - def validatePartition(zkUtils: ZkUtils, topic: String, partition: Int): Boolean = { - // check if partition exists - val partitionsOpt = zkUtils.getPartitionsForTopics(List(topic)).get(topic) - partitionsOpt match { - case Some(partitions) => - if(partitions.contains(partition)) { - true - } else { - error("Skipping preferred replica leader election for partition [%s,%d] ".format(topic, partition) + - "since it doesn't exist") - false - } - case None => error("Skipping preferred replica leader election for partition " + - "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic)) - false - } - } }
