This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b0d840d KAFKA-5928; Avoid redundant requests to zookeeper when
reassign topic partition
b0d840d is described below
commit b0d840d34b4172add831367e8fc2c51e75efb549
Author: uncleGen <[email protected]>
AuthorDate: Mon Aug 6 20:04:55 2018 -0700
KAFKA-5928; Avoid redundant requests to zookeeper when reassign topic
partition
Author: uncleGen <[email protected]>
Reviewers: Ismael Juma <[email protected]>, Dong Lin <[email protected]>
Closes #3894 from uncleGen/KAFKA-5928
---
.../kafka/admin/ReassignPartitionsCommand.scala | 74 +++++++++++-----------
.../scala/unit/kafka/admin/DeleteTopicTest.scala | 5 +-
.../admin/ReassignPartitionsCommandTest.scala | 15 ++---
3 files changed, 44 insertions(+), 50 deletions(-)
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 4d9da90..041375a 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -347,13 +347,24 @@ object ReassignPartitionsCommand extends Logging {
(partitionsToBeReassigned, replicaAssignment)
}
- private def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient,
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
+ def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient,
partitionsToBeReassigned: Map[TopicPartition, Seq[Int]])
:Map[TopicPartition, ReassignmentStatus] = {
val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
- partitionsToBeReassigned.keys.map { topicAndPartition =>
- (topicAndPartition, checkIfPartitionReassignmentSucceeded(zkClient,
topicAndPartition, partitionsToBeReassigned,
- partitionsBeingReassigned))
+ val (beingReassigned, notBeingReassigned) =
partitionsToBeReassigned.keys.partition { topicAndPartition =>
+ partitionsBeingReassigned.contains(topicAndPartition)
+ }
+ notBeingReassigned.groupBy(_.topic).flatMap { case (topic, partitions) =>
+ val replicasForTopic =
zkClient.getReplicaAssignmentForTopics(immutable.Set(topic))
+ partitions.map { topicAndPartition =>
+ val newReplicas = partitionsToBeReassigned(topicAndPartition)
+ val reassignmentStatus = replicasForTopic.get(topicAndPartition) match
{
+ case Some(seq) if seq == newReplicas => ReassignmentCompleted
+ case _ => ReassignmentFailed
+ }
+ (topicAndPartition, reassignmentStatus)
+ }
+ } ++ beingReassigned.map { topicAndPartition =>
+ (topicAndPartition, ReassignmentInProgress)
}.toMap
}
@@ -398,25 +409,6 @@ object ReassignPartitionsCommand extends Logging {
}
}
- def checkIfPartitionReassignmentSucceeded(zkClient: KafkaZkClient,
topicAndPartition: TopicPartition,
- partitionsToBeReassigned:
Map[TopicPartition, Seq[Int]],
- partitionsBeingReassigned:
Map[TopicPartition, Seq[Int]]): ReassignmentStatus = {
- val newReplicas = partitionsToBeReassigned(topicAndPartition)
- partitionsBeingReassigned.get(topicAndPartition) match {
- case Some(_) => ReassignmentInProgress
- case None =>
- // check if the current replica assignment matches the expected one
after reassignment
- val assignedReplicas = zkClient.getReplicasForPartition(new
TopicPartition(topicAndPartition.topic, topicAndPartition.partition))
- if(assignedReplicas == newReplicas)
- ReassignmentCompleted
- else {
- println(("ERROR: Assigned replicas (%s) don't match the list of
replicas for reassignment (%s)" +
- " for partition %s").format(assignedReplicas.mkString(","),
newReplicas.mkString(","), topicAndPartition))
- ReassignmentFailed
- }
- }
- }
-
def validateAndParseArgs(args: Array[String]):
ReassignPartitionsCommandOptions = {
val opts = new ReassignPartitionsCommandOptions(args)
@@ -559,7 +551,7 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
private[admin] def assignThrottledReplicas(existingPartitionAssignment:
Map[TopicPartition, Seq[Int]],
proposedPartitionAssignment:
Map[TopicPartition, Seq[Int]],
adminZkClient: AdminZkClient):
Unit = {
- for (topic <- proposedPartitionAssignment.keySet.map(_.topic).toSeq) {
+ for (topic <-
proposedPartitionAssignment.keySet.map(_.topic).toSeq.distinct) {
val existingPartitionAssignmentForTopic =
existingPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
val proposedPartitionAssignmentForTopic =
proposedPartitionAssignment.filter { case (tp, _) => tp.topic == topic }
@@ -621,7 +613,10 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
def reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long =
10000L): Boolean = {
maybeThrottle(throttle)
try {
- val validPartitions = proposedPartitionAssignment.filter { case (p, _)
=> validatePartition(zkClient, p.topic, p.partition) }
+ val validPartitions = proposedPartitionAssignment.groupBy(_._1.topic())
+ .flatMap { case (topic, topicPartitionReplicas) =>
+ validatePartition(zkClient, topic, topicPartitionReplicas)
+ }
if (validPartitions.isEmpty) false
else {
if (proposedReplicaAssignment.nonEmpty && adminClientOpt.isEmpty)
@@ -655,21 +650,24 @@ class ReassignPartitionsCommand(zkClient: KafkaZkClient,
}
}
- def validatePartition(zkClient: KafkaZkClient, topic: String, partition:
Int): Boolean = {
+ def validatePartition(zkClient: KafkaZkClient, topic: String,
topicPartitionReplicas: Map[TopicPartition, Seq[Int]])
+ :Map[TopicPartition, Seq[Int]] = {
// check if partition exists
val partitionsOpt =
zkClient.getPartitionsForTopics(immutable.Set(topic)).get(topic)
- partitionsOpt match {
- case Some(partitions) =>
- if(partitions.contains(partition)) {
- true
- } else {
- error("Skipping reassignment of partition [%s,%d] ".format(topic,
partition) +
- "since it doesn't exist")
+ topicPartitionReplicas.filter { case (topicPartition, _) =>
+ partitionsOpt match {
+ case Some(partitions) =>
+ if (partitions.contains(topicPartition.partition())) {
+ true
+ } else {
+ error("Skipping reassignment of partition [%s,%d] ".format(topic,
topicPartition.partition()) +
+ "since it doesn't exist")
+ false
+ }
+ case None => error("Skipping reassignment of partition " +
+ "[%s,%d] since topic %s doesn't exist".format(topic,
topicPartition.partition(), topic))
false
- }
- case None => error("Skipping reassignment of partition " +
- "[%s,%d] since topic %s doesn't exist".format(topic, partition, topic))
- false
+ }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
index 4d089b3..12fb479 100644
--- a/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/DeleteTopicTest.scala
@@ -132,9 +132,8 @@ class DeleteTopicTest extends ZooKeeperTestHarness {
assertTrue("Partition reassignment should fail for [test,0]",
reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
topicPartition,
- Map(topicPartition -> newReplicas), partitionsBeingReassigned) ==
ReassignmentFailed
+
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
Map(topicPartition -> newReplicas))
+ .getOrElse(topicPartition, fail(s"Failed to get reassignment status
for $topicPartition")) == ReassignmentFailed
}, "Partition reassignment shouldn't complete.")
val controllerId = zkClient.getControllerId.getOrElse(fail("Controller
doesn't exist"))
val controller = servers.filter(s => s.config.brokerId ==
controllerId).head
diff --git
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 6978f8d..213c23a 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -439,9 +439,8 @@ class ReassignPartitionsCommandTest extends
ZooKeeperTestHarness with Logging {
assertTrue("Partition reassignment attempt failed for [test, 0]",
reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
topicAndPartition,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) ==
ReassignmentCompleted
+
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
Map(topicAndPartition -> newReplicas))
+ .getOrElse(topicAndPartition, fail(s"Failed to get reassignment
status for $topicAndPartition")) == ReassignmentCompleted
},
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new
TopicPartition(topic, partitionToBeReassigned))
@@ -469,9 +468,8 @@ class ReassignPartitionsCommandTest extends
ZooKeeperTestHarness with Logging {
assertTrue("Partition reassignment failed for test, 0",
reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
topicAndPartition,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) ==
ReassignmentCompleted
+
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
Map(topicAndPartition -> newReplicas))
+ .getOrElse(topicAndPartition, fail(s"Failed to get reassignment
status for $topicAndPartition")) == ReassignmentCompleted
},
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new
TopicPartition(topic, partitionToBeReassigned))
@@ -498,9 +496,8 @@ class ReassignPartitionsCommandTest extends
ZooKeeperTestHarness with Logging {
assertTrue("Partition reassignment failed for test, 0",
reassignPartitionsCommand.reassignPartitions())
// wait until reassignment is completed
TestUtils.waitUntilTrue(() => {
- val partitionsBeingReassigned = zkClient.getPartitionReassignment
-
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
topicAndPartition,
- Map(topicAndPartition -> newReplicas), partitionsBeingReassigned) ==
ReassignmentCompleted
+
ReassignPartitionsCommand.checkIfPartitionReassignmentSucceeded(zkClient,
Map(topicAndPartition -> newReplicas))
+ .getOrElse(topicAndPartition, fail(s"Failed to get reassignment
status for $topicAndPartition")) == ReassignmentCompleted
},
"Partition reassignment should complete")
val assignedReplicas = zkClient.getReplicasForPartition(new
TopicPartition(topic, partitionToBeReassigned))