This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new daf8a0dedac KAFKA-14595 ReassignPartitionsUnitTest rewritten in java 
(#14355)
daf8a0dedac is described below

commit daf8a0dedacceb46d6ec2bcc16181abf5963377b
Author: Nikolay <[email protected]>
AuthorDate: Sat Sep 23 04:45:14 2023 +0300

    KAFKA-14595 ReassignPartitionsUnitTest rewritten in java (#14355)
    
    This PR is part of #13247
    It contains changes to rewrite single test in java.
    Intention is reduce changes in parent PR.
    
    Reviewers: Luke Chen <[email protected]>, Taras Ledkov <[email protected]>
---
 checkstyle/import-control.xml                      |   2 +
 .../kafka/admin/ReassignPartitionsCommand.scala    |  14 +-
 .../kafka/admin/ReassignPartitionsUnitTest.scala   | 671 -----------------
 .../tools/reassign/ReassignPartitionsUnitTest.java | 790 +++++++++++++++++++++
 4 files changed, 799 insertions(+), 678 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index a12ee2ea93e..f4b5decdd88 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -292,7 +292,9 @@
     <allow pkg="scala.collection" />
 
     <subpackage name="reassign">
+      <allow pkg="org.apache.kafka.admin"/>
       <allow pkg="kafka.admin" />
+      <allow pkg="scala" />
     </subpackage>
   </subpackage>
 
diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 
b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
index 2fc36648981..205eb9ded93 100755
--- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala
@@ -57,24 +57,24 @@ object ReassignPartitionsCommand extends Logging {
   private[admin] val EarliestTopicsJsonVersion = 1
 
   // Throttles that are set at the level of an individual broker.
-  private[admin] val brokerLevelLeaderThrottle =
+  val brokerLevelLeaderThrottle =
     DynamicConfig.Broker.LeaderReplicationThrottledRateProp
-  private[admin] val brokerLevelFollowerThrottle =
+  val brokerLevelFollowerThrottle =
     DynamicConfig.Broker.FollowerReplicationThrottledRateProp
-  private[admin] val brokerLevelLogDirThrottle =
+  val brokerLevelLogDirThrottle =
     DynamicConfig.Broker.ReplicaAlterLogDirsIoMaxBytesPerSecondProp
-  private[admin] val brokerLevelThrottles = Seq(
+  val brokerLevelThrottles = Seq(
     brokerLevelLeaderThrottle,
     brokerLevelFollowerThrottle,
     brokerLevelLogDirThrottle
   )
 
   // Throttles that are set at the level of an individual topic.
-  private[admin] val topicLevelLeaderThrottle =
+  val topicLevelLeaderThrottle =
     LogConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG
-  private[admin] val topicLevelFollowerThrottle =
+  val topicLevelFollowerThrottle =
     LogConfig.FOLLOWER_REPLICATION_THROTTLED_REPLICAS_CONFIG
-  private[admin] val topicLevelThrottles = Seq(
+  val topicLevelThrottles = Seq(
     topicLevelLeaderThrottle,
     topicLevelFollowerThrottle
   )
diff --git 
a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala 
b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
deleted file mode 100644
index 7aa0f8eec23..00000000000
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsUnitTest.scala
+++ /dev/null
@@ -1,671 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.admin
-
-import java.util.concurrent.ExecutionException
-import java.util.{Arrays, Collections, Optional}
-import kafka.admin.ReassignPartitionsCommand._
-import kafka.utils.Exit
-import org.apache.kafka.admin.BrokerMetadata
-import org.apache.kafka.clients.admin.{Config, MockAdminClient, 
PartitionReassignment}
-import org.apache.kafka.common.config.ConfigResource
-import org.apache.kafka.common.errors.{InvalidReplicationFactorException, 
UnknownTopicOrPartitionException}
-import org.apache.kafka.common.{Node, TopicPartition, TopicPartitionInfo, 
TopicPartitionReplica}
-import org.apache.kafka.server.common.{AdminCommandFailedException, 
AdminOperationException}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, 
assertThrows, assertTrue}
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, Timeout}
-
-import scala.collection.mutable
-import scala.jdk.CollectionConverters._
-
-@Timeout(60)
-class ReassignPartitionsUnitTest {
-
-  @BeforeEach
-  def setUp(): Unit = {
-    Exit.setExitProcedure((_, message) => throw new 
IllegalArgumentException(message.orNull))
-  }
-
-  @AfterEach
-  def tearDown(): Unit = {
-    Exit.resetExitProcedure()
-  }
-
-  @Test
-  def testCompareTopicPartitions(): Unit = {
-    assertTrue(compareTopicPartitions(new TopicPartition("abc", 0),
-      new TopicPartition("abc", 1)))
-    assertFalse(compareTopicPartitions(new TopicPartition("def", 0),
-      new TopicPartition("abc", 1)))
-  }
-
-  @Test
-  def testCompareTopicPartitionReplicas(): Unit = {
-    assertTrue(compareTopicPartitionReplicas(new TopicPartitionReplica("def", 
0, 0),
-      new TopicPartitionReplica("abc", 0, 1)))
-    assertFalse(compareTopicPartitionReplicas(new TopicPartitionReplica("def", 
0, 0),
-      new TopicPartitionReplica("cde", 0, 0)))
-  }
-
-  @Test
-  def testPartitionReassignStatesToString(): Unit = {
-    assertEquals(Seq(
-      "Status of partition reassignment:",
-      "Reassignment of partition bar-0 is still in progress.",
-      "Reassignment of partition foo-0 is completed.",
-      "Reassignment of partition foo-1 is still in progress.").
-        mkString(System.lineSeparator()),
-      partitionReassignmentStatesToString(Map(
-        new TopicPartition("foo", 0) ->
-          PartitionReassignmentState(Seq(1, 2, 3), Seq(1, 2, 3), true),
-        new TopicPartition("foo", 1) ->
-          PartitionReassignmentState(Seq(1, 2, 3), Seq(1, 2, 4), false),
-        new TopicPartition("bar", 0) ->
-          PartitionReassignmentState(Seq(1, 2, 3), Seq(1, 2, 4), false),
-      )))
-  }
-
-  private def addTopics(adminClient: MockAdminClient): Unit = {
-    val b = adminClient.brokers()
-    adminClient.addTopic(false, "foo", Arrays.asList(
-      new TopicPartitionInfo(0, b.get(0),
-        Arrays.asList(b.get(0), b.get(1), b.get(2)),
-        Arrays.asList(b.get(0), b.get(1))),
-      new TopicPartitionInfo(1, b.get(1),
-        Arrays.asList(b.get(1), b.get(2), b.get(3)),
-        Arrays.asList(b.get(1), b.get(2), b.get(3)))
-    ), Collections.emptyMap())
-    adminClient.addTopic(false, "bar", Arrays.asList(
-      new TopicPartitionInfo(0, b.get(2),
-        Arrays.asList(b.get(2), b.get(3), b.get(0)),
-        Arrays.asList(b.get(2), b.get(3), b.get(0)))
-    ), Collections.emptyMap())
-  }
-
-  @Test
-  def testFindPartitionReassignmentStates(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      // Create a reassignment and test findPartitionReassignmentStates.
-      val reassignmentResult: Map[TopicPartition, Class[_ <: Throwable]] = 
alterPartitionReassignments(adminClient, Map(
-        new TopicPartition("foo", 0) -> Seq(0,1,3),
-        new TopicPartition("quux", 0) -> Seq(1,2,3))).map { case (k, v) => k 
-> v.getClass }.toMap
-      assertEquals(Map(new TopicPartition("quux", 0) -> 
classOf[UnknownTopicOrPartitionException]),
-        reassignmentResult)
-      assertEquals((Map(
-          new TopicPartition("foo", 0) -> 
PartitionReassignmentState(Seq(0,1,2), Seq(0,1,3), false),
-          new TopicPartition("foo", 1) -> 
PartitionReassignmentState(Seq(1,2,3), Seq(1,2,3), true)
-        ), true),
-        findPartitionReassignmentStates(adminClient, Seq(
-          (new TopicPartition("foo", 0), Seq(0,1,3)),
-          (new TopicPartition("foo", 1), Seq(1,2,3))
-        )))
-      // Cancel the reassignment and test findPartitionReassignmentStates 
again.
-      val cancelResult: Map[TopicPartition, Class[_ <: Throwable]] = 
cancelPartitionReassignments(adminClient,
-        Set(new TopicPartition("foo", 0), new TopicPartition("quux", 2))).map 
{ case (k, v) =>
-          k -> v.getClass
-        }.toMap
-      assertEquals(Map(new TopicPartition("quux", 2) -> 
classOf[UnknownTopicOrPartitionException]),
-        cancelResult)
-      assertEquals((Map(
-          new TopicPartition("foo", 0) -> 
PartitionReassignmentState(Seq(0,1,2), Seq(0,1,3), true),
-          new TopicPartition("foo", 1) -> 
PartitionReassignmentState(Seq(1,2,3), Seq(1,2,3), true)
-        ), false),
-          findPartitionReassignmentStates(adminClient, Seq(
-            (new TopicPartition("foo", 0), Seq(0,1,3)),
-            (new TopicPartition("foo", 1), Seq(1,2,3))
-          )))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testFindLogDirMoveStates(): Unit = {
-    val adminClient = new MockAdminClient.Builder().
-      numBrokers(4).
-      brokerLogDirs(Arrays.asList(
-          Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
-          Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
-          Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
-          Arrays.asList("/tmp/kafka-logs0", null))).
-      build();
-    try {
-      addTopics(adminClient)
-      val b = adminClient.brokers()
-      adminClient.addTopic(false, "quux", Arrays.asList(
-        new TopicPartitionInfo(0, b.get(2),
-            Arrays.asList(b.get(1), b.get(2), b.get(3)),
-            Arrays.asList(b.get(1), b.get(2), b.get(3)))),
-        Collections.emptyMap())
-      adminClient.alterReplicaLogDirs(Map(
-        new TopicPartitionReplica("foo", 0, 0) -> "/tmp/kafka-logs1",
-        new TopicPartitionReplica("quux", 0, 0) -> "/tmp/kafka-logs1"
-      ).asJava).all().get()
-      assertEquals(Map(
-        new TopicPartitionReplica("bar", 0, 0) -> new 
CompletedMoveState("/tmp/kafka-logs0"),
-        new TopicPartitionReplica("foo", 0, 0) -> new 
ActiveMoveState("/tmp/kafka-logs0",
-            "/tmp/kafka-logs1", "/tmp/kafka-logs1"),
-        new TopicPartitionReplica("foo", 1, 0) -> new 
CancelledMoveState("/tmp/kafka-logs0",
-          "/tmp/kafka-logs1"),
-        new TopicPartitionReplica("quux", 1, 0) -> new 
MissingLogDirMoveState("/tmp/kafka-logs1"),
-        new TopicPartitionReplica("quuz", 0, 0) -> new 
MissingReplicaMoveState("/tmp/kafka-logs0")
-      ), findLogDirMoveStates(adminClient, Map(
-        new TopicPartitionReplica("bar", 0, 0) -> "/tmp/kafka-logs0",
-        new TopicPartitionReplica("foo", 0, 0) -> "/tmp/kafka-logs1",
-        new TopicPartitionReplica("foo", 1, 0) -> "/tmp/kafka-logs1",
-        new TopicPartitionReplica("quux", 1, 0) -> "/tmp/kafka-logs1",
-        new TopicPartitionReplica("quuz", 0, 0) -> "/tmp/kafka-logs0"
-      )))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testReplicaMoveStatesToString(): Unit = {
-    assertEquals(Seq(
-      "Reassignment of replica bar-0-0 completed successfully.",
-      "Reassignment of replica foo-0-0 is still in progress.",
-      "Partition foo-1 on broker 0 is not being moved from log dir 
/tmp/kafka-logs0 to /tmp/kafka-logs1.",
-      "Partition quux-0 cannot be found in any live log directory on broker 
0.",
-      "Partition quux-1 on broker 1 is being moved to log dir /tmp/kafka-logs2 
instead of /tmp/kafka-logs1.",
-      "Partition quux-2 is not found in any live log dir on broker 1. " +
-          "There is likely an offline log directory on the 
broker.").mkString(System.lineSeparator()),
-        replicaMoveStatesToString(Map(
-          new TopicPartitionReplica("bar", 0, 0) -> 
CompletedMoveState("/tmp/kafka-logs0"),
-          new TopicPartitionReplica("foo", 0, 0) -> 
ActiveMoveState("/tmp/kafka-logs0",
-            "/tmp/kafka-logs1", "/tmp/kafka-logs1"),
-          new TopicPartitionReplica("foo", 1, 0) -> 
CancelledMoveState("/tmp/kafka-logs0",
-            "/tmp/kafka-logs1"),
-          new TopicPartitionReplica("quux", 0, 0) -> 
MissingReplicaMoveState("/tmp/kafka-logs1"),
-          new TopicPartitionReplica("quux", 1, 1) -> 
ActiveMoveState("/tmp/kafka-logs0",
-            "/tmp/kafka-logs1", "/tmp/kafka-logs2"),
-          new TopicPartitionReplica("quux", 2, 1) -> 
MissingLogDirMoveState("/tmp/kafka-logs1")
-        )))
-  }
-
-  @Test
-  def testGetReplicaAssignments(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      assertEquals(Map(
-          new TopicPartition("foo", 0) -> Seq(0, 1, 2),
-          new TopicPartition("foo", 1) -> Seq(1, 2, 3),
-        ),
-        getReplicaAssignmentForTopics(adminClient, Seq("foo")))
-      assertEquals(Map(
-          new TopicPartition("foo", 0) -> Seq(0, 1, 2),
-          new TopicPartition("bar", 0) -> Seq(2, 3, 0),
-        ),
-        getReplicaAssignmentForPartitions(adminClient, Set(
-          new TopicPartition("foo", 0), new TopicPartition("bar", 0))))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testGetBrokerRackInformation(): Unit = {
-    val adminClient = new MockAdminClient.Builder().
-      brokers(Arrays.asList(new Node(0, "localhost", 9092, "rack0"),
-        new Node(1, "localhost", 9093, "rack1"),
-        new Node(2, "localhost", 9094, null))).
-      build()
-    try {
-      assertEquals(Seq(
-        new BrokerMetadata(0, Optional.of("rack0")),
-        new BrokerMetadata(1, Optional.of("rack1"))
-      ), getBrokerMetadata(adminClient, Seq(0, 1), true))
-      assertEquals(Seq(
-        new BrokerMetadata(0, Optional.empty()),
-        new BrokerMetadata(1, Optional.empty())
-      ), getBrokerMetadata(adminClient, Seq(0, 1), false))
-      assertStartsWith("Not all brokers have rack information",
-        assertThrows(classOf[AdminOperationException],
-          () => getBrokerMetadata(adminClient, Seq(1, 2), true)).getMessage)
-      assertEquals(Seq(
-        new BrokerMetadata(1, Optional.empty()),
-        new BrokerMetadata(2, Optional.empty())
-      ), getBrokerMetadata(adminClient, Seq(1, 2), false))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testParseGenerateAssignmentArgs(): Unit = {
-    assertStartsWith("Broker list contains duplicate entries",
-      assertThrows(classOf[AdminCommandFailedException], () => 
parseGenerateAssignmentArgs(
-          """{"topics": [{"topic": "foo"}], "version":1}""", "1,1,2"),
-        () => "Expected to detect duplicate broker list entries").getMessage)
-    assertStartsWith("Broker list contains duplicate entries",
-      assertThrows(classOf[AdminCommandFailedException], () => 
parseGenerateAssignmentArgs(
-          """{"topics": [{"topic": "foo"}], "version":1}""", "5,2,3,4,5"),
-        () => "Expected to detect duplicate broker list entries").getMessage)
-    assertEquals((Seq(5,2,3,4),Seq("foo")),
-      parseGenerateAssignmentArgs("""{"topics": [{"topic": "foo"}], 
"version":1}""",
-        "5,2,3,4"))
-    assertStartsWith("List of topics to reassign contains duplicate entries",
-      assertThrows(classOf[AdminCommandFailedException], () => 
parseGenerateAssignmentArgs(
-          """{"topics": [{"topic": "foo"},{"topic": "foo"}], "version":1}""", 
"5,2,3,4"),
-        () => "Expected to detect duplicate topic entries").getMessage)
-    assertEquals((Seq(5,3,4),Seq("foo","bar")),
-      parseGenerateAssignmentArgs(
-        """{"topics": [{"topic": "foo"},{"topic": "bar"}], "version":1}""",
-        "5,3,4"))
-  }
-
-  @Test
-  def testGenerateAssignmentFailsWithoutEnoughReplicas(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      assertStartsWith("Replication factor: 3 larger than available brokers: 
2",
-        assertThrows(classOf[InvalidReplicationFactorException],
-          () => generateAssignment(adminClient, 
"""{"topics":[{"topic":"foo"},{"topic":"bar"}]}""", "0,1", false),
-          () => "Expected generateAssignment to fail").getMessage)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testGenerateAssignmentWithInvalidPartitionsFails(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(5).build()
-    try {
-      addTopics(adminClient)
-      assertStartsWith("Topic quux not found",
-        assertThrows(classOf[ExecutionException],
-          () => generateAssignment(adminClient, 
"""{"topics":[{"topic":"foo"},{"topic":"quux"}]}""", "0,1", false),
-          () => "Expected generateAssignment to fail").getCause.getMessage)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testGenerateAssignmentWithInconsistentRacks(): Unit = {
-    val adminClient = new MockAdminClient.Builder().
-      brokers(Arrays.asList(
-        new Node(0, "localhost", 9092, "rack0"),
-        new Node(1, "localhost", 9093, "rack0"),
-        new Node(2, "localhost", 9094, null),
-        new Node(3, "localhost", 9095, "rack1"),
-        new Node(4, "localhost", 9096, "rack1"),
-        new Node(5, "localhost", 9097, "rack2"))).
-      build()
-    try {
-      addTopics(adminClient)
-      assertStartsWith("Not all brokers have rack information.",
-        assertThrows(classOf[AdminOperationException],
-          () => generateAssignment(adminClient, 
"""{"topics":[{"topic":"foo"}]}""", "0,1,2,3", true),
-          () => "Expected generateAssignment to fail").getMessage)
-      // It should succeed when --disable-rack-aware is used.
-      val (_, current) = generateAssignment(adminClient,
-        """{"topics":[{"topic":"foo"}]}""", "0,1,2,3", false)
-      assertEquals(Map(
-        new TopicPartition("foo", 0) -> Seq(0, 1, 2),
-        new TopicPartition("foo", 1) -> Seq(1, 2, 3),
-      ), current)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testGenerateAssignmentWithFewerBrokers(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      val goalBrokers = Set(0,1,3)
-      val (proposed, current) = generateAssignment(adminClient,
-        """{"topics":[{"topic":"foo"},{"topic":"bar"}]}""",
-        goalBrokers.mkString(","), false)
-      assertEquals(Map(
-        new TopicPartition("foo", 0) -> Seq(0, 1, 2),
-        new TopicPartition("foo", 1) -> Seq(1, 2, 3),
-        new TopicPartition("bar", 0) -> Seq(2, 3, 0)
-      ), current)
-
-      // The proposed assignment should only span the provided brokers
-      proposed.values.foreach(replicas => 
assertTrue(replicas.forall(goalBrokers.contains),
-        s"Proposed assignment $proposed puts replicas on brokers other than 
$goalBrokers"))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testCurrentPartitionReplicaAssignmentToString(): Unit = {
-    assertEquals(Seq(
-        """Current partition replica assignment""",
-        """""",
-        """{"version":1,"partitions":""" +
-          
"""[{"topic":"bar","partition":0,"replicas":[7,8],"log_dirs":["any","any"]},""" 
+
-          
"""{"topic":"foo","partition":1,"replicas":[4,5,6],"log_dirs":["any","any","any"]}]"""
 +
-        """}""",
-        """""",
-        """Save this to use as the --reassignment-json-file option during 
rollback"""
-      ).mkString(System.lineSeparator()),
-      currentPartitionReplicaAssignmentToString(Map(
-          new TopicPartition("foo", 1) -> Seq(1,2,3),
-          new TopicPartition("bar", 0) -> Seq(7,8,9)
-        ),
-        Map(
-          new TopicPartition("foo", 0) -> Seq(1,2,3),
-          new TopicPartition("foo", 1) -> Seq(4,5,6),
-          new TopicPartition("bar", 0) -> Seq(7,8),
-          new TopicPartition("baz", 0) -> Seq(10,11,12)
-        ),
-      ))
-  }
-
-  @Test
-  def testMoveMap(): Unit = {
-    // overwrite foo-0 with different reassignments
-    // keep old reassignments of foo-1
-    // overwrite foo-2 with same reassignments
-    // overwrite foo-3 with new reassignments without overlap of old 
reassignments
-    // overwrite foo-4 with a subset of old reassignments
-    // overwrite foo-5 with a superset of old reassignments
-    // add new reassignments to bar-0
-    val moveMap = calculateProposedMoveMap(Map(
-      new TopicPartition("foo", 0) -> new PartitionReassignment(
-        Arrays.asList(1,2,3,4), Arrays.asList(4), Arrays.asList(3)),
-      new TopicPartition("foo", 1) -> new PartitionReassignment(
-        Arrays.asList(4,5,6,7,8), Arrays.asList(7, 8), Arrays.asList(4, 5)),
-      new TopicPartition("foo", 2) -> new PartitionReassignment(
-        Arrays.asList(1,2,3,4), Arrays.asList(3,4), Arrays.asList(1,2)),
-      new TopicPartition("foo", 3) -> new PartitionReassignment(
-        Arrays.asList(1,2,3,4), Arrays.asList(3,4), Arrays.asList(1,2)),
-      new TopicPartition("foo", 4) -> new PartitionReassignment(
-        Arrays.asList(1,2,3,4), Arrays.asList(3,4), Arrays.asList(1,2)),
-      new TopicPartition("foo", 5) -> new PartitionReassignment(
-        Arrays.asList(1,2,3,4), Arrays.asList(3,4), Arrays.asList(1,2))
-    ), Map(
-      new TopicPartition("foo", 0) -> Seq(1,2,5),
-      new TopicPartition("foo", 2) -> Seq(3,4),
-      new TopicPartition("foo", 3) -> Seq(5,6),
-      new TopicPartition("foo", 4) -> Seq(3),
-      new TopicPartition("foo", 5) -> Seq(3,4,5,6),
-      new TopicPartition("bar", 0) -> Seq(1,2,3)
-    ), Map(
-      new TopicPartition("foo", 0) -> Seq(1,2,3,4),
-      new TopicPartition("foo", 1) -> Seq(4,5,6,7,8),
-      new TopicPartition("foo", 2) -> Seq(1,2,3,4),
-      new TopicPartition("foo", 3) -> Seq(1,2,3,4),
-      new TopicPartition("foo", 4) -> Seq(1,2,3,4),
-      new TopicPartition("foo", 5) -> Seq(1,2,3,4),
-      new TopicPartition("bar", 0) -> Seq(2,3,4),
-      new TopicPartition("baz", 0) -> Seq(1,2,3)
-    ))
-
-    assertEquals(
-      mutable.Map("foo" -> mutable.Map(
-        0 -> PartitionMove(mutable.Set(1,2,3), mutable.Set(5)),
-        1 -> PartitionMove(mutable.Set(4,5,6), mutable.Set(7,8)),
-        2 -> PartitionMove(mutable.Set(1,2), mutable.Set(3,4)),
-        3 -> PartitionMove(mutable.Set(1,2), mutable.Set(5,6)),
-        4 -> PartitionMove(mutable.Set(1,2), mutable.Set(3)),
-        5 -> PartitionMove(mutable.Set(1,2), mutable.Set(3,4,5,6))
-      ), "bar" -> mutable.Map(
-        0 -> PartitionMove(mutable.Set(2,3,4), mutable.Set(1)),
-      )), moveMap)
-
-    assertEquals(Map(
-        "foo" -> "0:1,0:2,0:3,1:4,1:5,1:6,2:1,2:2,3:1,3:2,4:1,4:2,5:1,5:2",
-        "bar" -> "0:2,0:3,0:4"
-      ), calculateLeaderThrottles(moveMap))
-
-    assertEquals(Map(
-        "foo" -> "0:5,1:7,1:8,2:3,2:4,3:5,3:6,4:3,5:3,5:4,5:5,5:6",
-        "bar" -> "0:1"
-      ), calculateFollowerThrottles(moveMap))
-
-    assertEquals(Set(1,2,3,4,5,6,7,8), calculateReassigningBrokers(moveMap))
-
-    assertEquals(Set(0,2), calculateMovingBrokers(
-      Set(new TopicPartitionReplica("quux", 0, 0),
-          new TopicPartitionReplica("quux", 1, 2))))
-  }
-
-  @Test
-  def testParseExecuteAssignmentArgs(): Unit = {
-    assertStartsWith("Partition reassignment list cannot be empty",
-      assertThrows(classOf[AdminCommandFailedException],
-        () => parseExecuteAssignmentArgs("""{"version":1,"partitions":[]}"""),
-        () => "Expected to detect empty partition reassignment 
list").getMessage)
-    assertStartsWith("Partition reassignment contains duplicate topic 
partitions",
-      assertThrows(classOf[AdminCommandFailedException], () => 
parseExecuteAssignmentArgs(
-          """{"version":1,"partitions":""" +
-            
"""[{"topic":"foo","partition":0,"replicas":[0,1],"log_dirs":["any","any"]},""" 
+
-            
"""{"topic":"foo","partition":0,"replicas":[2,3,4],"log_dirs":["any","any","any"]}"""
 +
-            """]}"""), () => "Expected to detect a partition list with 
duplicate entries").getMessage)
-    assertStartsWith("Partition reassignment contains duplicate topic 
partitions",
-      assertThrows(classOf[AdminCommandFailedException], () => 
parseExecuteAssignmentArgs(
-          """{"version":1,"partitions":""" +
-            
"""[{"topic":"foo","partition":0,"replicas":[0,1],"log_dirs":["/abc","/def"]},"""
 +
-            
"""{"topic":"foo","partition":0,"replicas":[2,3],"log_dirs":["/abc","/def"]}""" 
+
-            """]}"""), () => "Expected to detect a partition replica list with 
duplicate entries").getMessage)
-    assertStartsWith("Partition replica lists may not contain duplicate 
entries",
-      assertThrows(classOf[AdminCommandFailedException], () => 
parseExecuteAssignmentArgs(
-          """{"version":1,"partitions":""" +
-            
"""[{"topic":"foo","partition":0,"replicas":[0,0],"log_dirs":["/abc","/def"]},"""
 +
-            
"""{"topic":"foo","partition":1,"replicas":[2,3],"log_dirs":["/abc","/def"]}""" 
+
-            """]}"""), () => "Expected to detect a partition replica list with 
duplicate entries").getMessage)
-    assertEquals((Map(
-        new TopicPartition("foo", 0) -> Seq(1, 2, 3),
-        new TopicPartition("foo", 1) -> Seq(3, 4, 5),
-      ), Map(
-      )),
-      parseExecuteAssignmentArgs(
-        """{"version":1,"partitions":""" +
-          
"""[{"topic":"foo","partition":0,"replicas":[1,2,3],"log_dirs":["any","any","any"]},"""
 +
-          
"""{"topic":"foo","partition":1,"replicas":[3,4,5],"log_dirs":["any","any","any"]}"""
 +
-          """]}"""))
-    assertEquals((Map(
-      new TopicPartition("foo", 0) -> Seq(1, 2, 3),
-    ), Map(
-      new TopicPartitionReplica("foo", 0, 1) -> "/tmp/a",
-      new TopicPartitionReplica("foo", 0, 2) -> "/tmp/b",
-      new TopicPartitionReplica("foo", 0, 3) -> "/tmp/c"
-    )),
-      parseExecuteAssignmentArgs(
-        """{"version":1,"partitions":""" +
-          
"""[{"topic":"foo","partition":0,"replicas":[1,2,3],"log_dirs":["/tmp/a","/tmp/b","/tmp/c"]}"""
 +
-          """]}"""))
-  }
-
-  @Test
-  def testExecuteWithInvalidPartitionsFails(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(5).build()
-    try {
-      addTopics(adminClient)
-      assertStartsWith("Topic quux not found",
-        assertThrows(classOf[ExecutionException], () => 
executeAssignment(adminClient, false,
-            """{"version":1,"partitions":""" +
-              
"""[{"topic":"foo","partition":0,"replicas":[0,1],"log_dirs":["any","any"]},""" 
+
-              
"""{"topic":"quux","partition":0,"replicas":[2,3,4],"log_dirs":["any","any","any"]}"""
 +
-              """]}"""), () => "Expected reassignment with non-existent topic 
to fail").getCause.getMessage)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testExecuteWithInvalidBrokerIdFails(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      assertStartsWith("Unknown broker id 4",
-        assertThrows(classOf[AdminCommandFailedException], () => 
executeAssignment(adminClient, false,
-            """{"version":1,"partitions":""" +
-              
"""[{"topic":"foo","partition":0,"replicas":[0,1],"log_dirs":["any","any"]},""" 
+
-              
"""{"topic":"foo","partition":1,"replicas":[2,3,4],"log_dirs":["any","any","any"]}"""
 +
-              """]}"""), () => "Expected reassignment with non-existent broker 
id to fail").getMessage)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testModifyBrokerInterBrokerThrottle(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      modifyInterBrokerThrottle(adminClient, Set(0, 1, 2), 1000)
-      modifyInterBrokerThrottle(adminClient, Set(0, 3), 100)
-      val brokers = Seq(0, 1, 2, 3).map(
-        id => new ConfigResource(ConfigResource.Type.BROKER, id.toString))
-      val results = adminClient.describeConfigs(brokers.asJava).all().get()
-      verifyBrokerThrottleResults(results.get(brokers(0)), 100, -1)
-      verifyBrokerThrottleResults(results.get(brokers(1)), 1000, -1)
-      verifyBrokerThrottleResults(results.get(brokers(2)), 1000, -1)
-      verifyBrokerThrottleResults(results.get(brokers(3)), 100, -1)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testModifyLogDirThrottle(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      modifyLogDirThrottle(adminClient, Set(0, 1, 2), 2000)
-      modifyLogDirThrottle(adminClient, Set(0, 3), -1)
-      val brokers = Seq(0, 1, 2, 3).map(
-        id => new ConfigResource(ConfigResource.Type.BROKER, id.toString))
-      val results = adminClient.describeConfigs(brokers.asJava).all().get()
-      verifyBrokerThrottleResults(results.get(brokers(0)), -1, 2000)
-      verifyBrokerThrottleResults(results.get(brokers(1)), -1, 2000)
-      verifyBrokerThrottleResults(results.get(brokers(2)), -1, 2000)
-      verifyBrokerThrottleResults(results.get(brokers(3)), -1, -1)
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  @Test
-  def testCurReassignmentsToString(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      assertEquals("No partition reassignments found.", 
curReassignmentsToString(adminClient))
-      val reassignmentResult: Map[TopicPartition, Class[_ <: Throwable]] = 
alterPartitionReassignments(adminClient,
-        Map(
-          new TopicPartition("foo", 1) -> Seq(4,5,3),
-          new TopicPartition("foo", 0) -> Seq(0,1,4,2),
-          new TopicPartition("bar", 0) -> Seq(2,3)
-        )
-      ).map { case (k, v) => k -> v.getClass }.toMap
-      assertEquals(Map(), reassignmentResult)
-      assertEquals(Seq("Current partition reassignments:",
-                       "bar-0: replicas: 2,3,0. removing: 0.",
-                       "foo-0: replicas: 0,1,2. adding: 4.",
-                       "foo-1: replicas: 1,2,3. adding: 4,5. removing: 
1,2.").mkString(System.lineSeparator()),
-                  curReassignmentsToString(adminClient))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  private def verifyBrokerThrottleResults(config: Config,
-                                          expectedInterBrokerThrottle: Long,
-                                          expectedReplicaAlterLogDirsThrottle: 
Long): Unit = {
-    val configs = new mutable.HashMap[String, String]
-    config.entries.forEach(entry => configs.put(entry.name, entry.value))
-    if (expectedInterBrokerThrottle >= 0) {
-      assertEquals(expectedInterBrokerThrottle.toString,
-        configs.getOrElse(brokerLevelLeaderThrottle, ""))
-      assertEquals(expectedInterBrokerThrottle.toString,
-        configs.getOrElse(brokerLevelFollowerThrottle, ""))
-    }
-    if (expectedReplicaAlterLogDirsThrottle >= 0) {
-      assertEquals(expectedReplicaAlterLogDirsThrottle.toString,
-        configs.getOrElse(brokerLevelLogDirThrottle, ""))
-    }
-  }
-
-  @Test
-  def testModifyTopicThrottles(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      modifyTopicThrottles(adminClient,
-        Map("foo" -> "leaderFoo", "bar" -> "leaderBar"),
-        Map("bar" -> "followerBar"))
-      val topics = Seq("bar", "foo").map(
-        id => new ConfigResource(ConfigResource.Type.TOPIC, id))
-      val results = adminClient.describeConfigs(topics.asJava).all().get()
-      verifyTopicThrottleResults(results.get(topics(0)), "leaderBar", 
"followerBar")
-      verifyTopicThrottleResults(results.get(topics(1)), "leaderFoo", "")
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  private def verifyTopicThrottleResults(config: Config,
-                                         expectedLeaderThrottle: String,
-                                         expectedFollowerThrottle: String): 
Unit = {
-    val configs = new mutable.HashMap[String, String]
-    config.entries.forEach(entry => configs.put(entry.name, entry.value))
-    assertEquals(expectedLeaderThrottle,
-      configs.getOrElse(topicLevelLeaderThrottle, ""))
-    assertEquals(expectedFollowerThrottle,
-      configs.getOrElse(topicLevelFollowerThrottle, ""))
-  }
-
-  @Test
-  def testAlterReplicaLogDirs(): Unit = {
-    val adminClient = new MockAdminClient.Builder().
-      numBrokers(4).
-      brokerLogDirs(Collections.nCopies(4,
-        Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"))).
-      build()
-    try {
-      addTopics(adminClient)
-      assertEquals(Set(
-          new TopicPartitionReplica("foo", 0, 0)
-        ),
-        alterReplicaLogDirs(adminClient, Map(
-          new TopicPartitionReplica("foo", 0, 0) -> "/tmp/kafka-logs1",
-          new TopicPartitionReplica("quux", 1, 0) -> "/tmp/kafka-logs1"
-        )))
-    } finally {
-      adminClient.close()
-    }
-  }
-
-  def assertStartsWith(prefix: String, str: String): Unit = {
-    assertTrue(str.startsWith(prefix), "Expected the string to start with %s, 
but it was %s".format(prefix, str))
-  }
-
-  @Test
-  def testPropagateInvalidJsonError(): Unit = {
-    val adminClient = new MockAdminClient.Builder().numBrokers(4).build()
-    try {
-      addTopics(adminClient)
-      assertStartsWith("Unexpected character",
-        assertThrows(classOf[AdminOperationException], () => 
executeAssignment(adminClient, additional = false, "{invalid_json")).getMessage)
-    } finally {
-      adminClient.close()
-    }
-  }
-}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
new file mode 100644
index 00000000000..4639744226d
--- /dev/null
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -0,0 +1,790 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools.reassign;
+
+import kafka.admin.ReassignPartitionsCommand;
+import org.apache.kafka.admin.BrokerMetadata;
+import org.apache.kafka.clients.admin.Config;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.clients.admin.PartitionReassignment;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.TopicPartitionReplica;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import scala.Tuple2;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
kafka.admin.ReassignPartitionsCommand.alterPartitionReassignments;
+import static kafka.admin.ReassignPartitionsCommand.alterReplicaLogDirs;
+import static 
kafka.admin.ReassignPartitionsCommand.brokerLevelFollowerThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelLeaderThrottle;
+import static kafka.admin.ReassignPartitionsCommand.brokerLevelLogDirThrottle;
+import static kafka.admin.ReassignPartitionsCommand.calculateFollowerThrottles;
+import static kafka.admin.ReassignPartitionsCommand.calculateLeaderThrottles;
+import static kafka.admin.ReassignPartitionsCommand.calculateMovingBrokers;
+import static kafka.admin.ReassignPartitionsCommand.calculateProposedMoveMap;
+import static 
kafka.admin.ReassignPartitionsCommand.calculateReassigningBrokers;
+import static 
kafka.admin.ReassignPartitionsCommand.cancelPartitionReassignments;
+import static 
kafka.admin.ReassignPartitionsCommand.compareTopicPartitionReplicas;
+import static kafka.admin.ReassignPartitionsCommand.compareTopicPartitions;
+import static kafka.admin.ReassignPartitionsCommand.curReassignmentsToString;
+import static 
kafka.admin.ReassignPartitionsCommand.currentPartitionReplicaAssignmentToString;
+import static kafka.admin.ReassignPartitionsCommand.executeAssignment;
+import static kafka.admin.ReassignPartitionsCommand.findLogDirMoveStates;
+import static 
kafka.admin.ReassignPartitionsCommand.findPartitionReassignmentStates;
+import static kafka.admin.ReassignPartitionsCommand.generateAssignment;
+import static kafka.admin.ReassignPartitionsCommand.getBrokerMetadata;
+import static 
kafka.admin.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
+import static 
kafka.admin.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
+import static kafka.admin.ReassignPartitionsCommand.modifyInterBrokerThrottle;
+import static kafka.admin.ReassignPartitionsCommand.modifyLogDirThrottle;
+import static kafka.admin.ReassignPartitionsCommand.modifyTopicThrottles;
+import static kafka.admin.ReassignPartitionsCommand.parseExecuteAssignmentArgs;
+import static 
kafka.admin.ReassignPartitionsCommand.parseGenerateAssignmentArgs;
+import static 
kafka.admin.ReassignPartitionsCommand.partitionReassignmentStatesToString;
+import static kafka.admin.ReassignPartitionsCommand.replicaMoveStatesToString;
+import static kafka.admin.ReassignPartitionsCommand.topicLevelFollowerThrottle;
+import static kafka.admin.ReassignPartitionsCommand.topicLevelLeaderThrottle;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(60)
+public class ReassignPartitionsUnitTest {
+    @BeforeAll
+    public static void setUp() {
+        Exit.setExitProcedure((statusCode, message) -> {
+            throw new IllegalArgumentException(message);
+        });
+    }
+
+    @AfterAll
+    public static void tearDown() {
+        Exit.resetExitProcedure();
+    }
+
+    @Test
+    public void testCompareTopicPartitions() {
+        assertTrue(compareTopicPartitions(new TopicPartition("abc", 0),
+            new TopicPartition("abc", 1)));
+        assertFalse(compareTopicPartitions(new TopicPartition("def", 0),
+            new TopicPartition("abc", 1)));
+    }
+
+    @Test
+    public void testCompareTopicPartitionReplicas() {
+        assertTrue(compareTopicPartitionReplicas(new 
TopicPartitionReplica("def", 0, 0),
+            new TopicPartitionReplica("abc", 0, 1)));
+        assertFalse(compareTopicPartitionReplicas(new 
TopicPartitionReplica("def", 0, 0),
+            new TopicPartitionReplica("cde", 0, 0)));
+    }
+
+    @Test
+    public void testPartitionReassignStatesToString() {
+        Map<TopicPartition, 
ReassignPartitionsCommand.PartitionReassignmentState> states = new HashMap<>();
+
+        states.put(new TopicPartition("foo", 0),
+            new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 
3), seq(1, 2, 3), true));
+        states.put(new TopicPartition("foo", 1),
+            new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 
3), seq(1, 2, 4), false));
+        states.put(new TopicPartition("bar", 0),
+            new ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 
3), seq(1, 2, 4), false));
+
+        assertEquals(String.join(System.lineSeparator(),
+            "Status of partition reassignment:",
+            "Reassignment of partition bar-0 is still in progress.",
+            "Reassignment of partition foo-0 is completed.",
+            "Reassignment of partition foo-1 is still in progress."),
+            partitionReassignmentStatesToString(asScala(states)));
+    }
+
+    private void addTopics(MockAdminClient adminClient) {
+        List<Node> b = adminClient.brokers();
+        adminClient.addTopic(false, "foo", Arrays.asList(
+            new TopicPartitionInfo(0, b.get(0),
+                Arrays.asList(b.get(0), b.get(1), b.get(2)),
+                Arrays.asList(b.get(0), b.get(1))),
+            new TopicPartitionInfo(1, b.get(1),
+                Arrays.asList(b.get(1), b.get(2), b.get(3)),
+                Arrays.asList(b.get(1), b.get(2), b.get(3)))
+        ), Collections.emptyMap());
+        adminClient.addTopic(false, "bar", Arrays.asList(
+            new TopicPartitionInfo(0, b.get(2),
+                Arrays.asList(b.get(2), b.get(3), b.get(0)),
+                Arrays.asList(b.get(2), b.get(3), b.get(0)))
+        ), Collections.emptyMap());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void testFindPartitionReassignmentStates() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+            // Create a reassignment and test findPartitionReassignmentStates.
+            Map<TopicPartition, Seq<Object>> reassignments = new HashMap<>();
+
+            reassignments.put(new TopicPartition("foo", 0), seq(0, 1, 3));
+            reassignments.put(new TopicPartition("quux", 0), seq(1, 2, 3));
+
+            scala.collection.Map<TopicPartition, Throwable> reassignmentResult 
= alterPartitionReassignments(adminClient,
+                asScala(reassignments));
+
+            assertEquals(1, reassignmentResult.size());
+            assertEquals(UnknownTopicOrPartitionException.class, 
reassignmentResult.get(new TopicPartition("quux", 0)).get().getClass());
+
+            Map<TopicPartition, 
ReassignPartitionsCommand.PartitionReassignmentState> expStates = new 
HashMap<>();
+
+            expStates.put(new TopicPartition("foo", 0),
+                new 
ReassignPartitionsCommand.PartitionReassignmentState(seq(0, 1, 2), seq(0, 1, 
3), false));
+            expStates.put(new TopicPartition("foo", 1),
+                new 
ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 
3), true));
+
+            Tuple2<scala.collection.Map<TopicPartition, 
ReassignPartitionsCommand.PartitionReassignmentState>, Object> actual =
+                findPartitionReassignmentStates(adminClient, seq(
+                    new Tuple2<>(new TopicPartition("foo", 0), seq(0, 1, 3)),
+                    new Tuple2<>(new TopicPartition("foo", 1), seq(1, 2, 3))
+                ));
+
+            assertEquals(asScala(expStates), actual._1);
+            assertTrue((Boolean) actual._2);
+
+            // Cancel the reassignment and test 
findPartitionReassignmentStates again.
+            scala.collection.Map<TopicPartition, Throwable> cancelResult = 
cancelPartitionReassignments(adminClient,
+                set(new TopicPartition("foo", 0), new TopicPartition("quux", 
2)));
+
+            assertEquals(1, cancelResult.size());
+            assertEquals(UnknownTopicOrPartitionException.class, 
cancelResult.get(new TopicPartition("quux", 2)).get().getClass());
+
+            expStates.clear();
+
+            expStates.put(new TopicPartition("foo", 0),
+                new 
ReassignPartitionsCommand.PartitionReassignmentState(seq(0, 1, 2), seq(0, 1, 
3), true));
+            expStates.put(new TopicPartition("foo", 1),
+                new 
ReassignPartitionsCommand.PartitionReassignmentState(seq(1, 2, 3), seq(1, 2, 
3), true));
+
+            actual = findPartitionReassignmentStates(adminClient, seq(
+                new Tuple2<>(new TopicPartition("foo", 0), seq(0, 1, 3)),
+                new Tuple2<>(new TopicPartition("foo", 1), seq(1, 2, 3))
+            ));
+
+            assertEquals(asScala(expStates), actual._1);
+            assertFalse((Boolean) actual._2);
+        }
+    }
+
+    @Test
+    public void testFindLogDirMoveStates() throws Exception {
+        try (MockAdminClient adminClient = new MockAdminClient.Builder().
+                numBrokers(4).
+                brokerLogDirs(Arrays.asList(
+                    Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
+                    Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
+                    Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"),
+                    Arrays.asList("/tmp/kafka-logs0", null)))
+                .build()) {
+
+            addTopics(adminClient);
+            List<Node> b = adminClient.brokers();
+            adminClient.addTopic(false, "quux", Arrays.asList(
+                    new TopicPartitionInfo(0, b.get(2),
+                        Arrays.asList(b.get(1), b.get(2), b.get(3)),
+                        Arrays.asList(b.get(1), b.get(2), b.get(3)))),
+                Collections.emptyMap());
+
+            Map<TopicPartitionReplica, String> replicaAssignment = new 
HashMap<>();
+
+            replicaAssignment.put(new TopicPartitionReplica("foo", 0, 0), 
"/tmp/kafka-logs1");
+            replicaAssignment.put(new TopicPartitionReplica("quux", 0, 0), 
"/tmp/kafka-logs1");
+
+            adminClient.alterReplicaLogDirs(replicaAssignment).all().get();
+
+            Map<TopicPartitionReplica, 
ReassignPartitionsCommand.LogDirMoveState> states = new HashMap<>();
+
+            states.put(new TopicPartitionReplica("bar", 0, 0), new 
ReassignPartitionsCommand.CompletedMoveState("/tmp/kafka-logs0"));
+            states.put(new TopicPartitionReplica("foo", 0, 0), new 
ReassignPartitionsCommand.ActiveMoveState("/tmp/kafka-logs0",
+                "/tmp/kafka-logs1", "/tmp/kafka-logs1"));
+            states.put(new TopicPartitionReplica("foo", 1, 0), new 
ReassignPartitionsCommand.CancelledMoveState("/tmp/kafka-logs0",
+                "/tmp/kafka-logs1"));
+            states.put(new TopicPartitionReplica("quux", 1, 0), new 
ReassignPartitionsCommand.MissingLogDirMoveState("/tmp/kafka-logs1"));
+            states.put(new TopicPartitionReplica("quuz", 0, 0), new 
ReassignPartitionsCommand.MissingReplicaMoveState("/tmp/kafka-logs0"));
+
+            Map<TopicPartitionReplica, String> targetMoves = new HashMap<>();
+
+            targetMoves.put(new TopicPartitionReplica("bar", 0, 0), 
"/tmp/kafka-logs0");
+            targetMoves.put(new TopicPartitionReplica("foo", 0, 0), 
"/tmp/kafka-logs1");
+            targetMoves.put(new TopicPartitionReplica("foo", 1, 0), 
"/tmp/kafka-logs1");
+            targetMoves.put(new TopicPartitionReplica("quux", 1, 0), 
"/tmp/kafka-logs1");
+            targetMoves.put(new TopicPartitionReplica("quuz", 0, 0), 
"/tmp/kafka-logs0");
+
+            assertEquals(asScala(states), findLogDirMoveStates(adminClient, 
asScala(targetMoves)));
+        }
+    }
+
+    @Test
+    public void testReplicaMoveStatesToString() {
+        Map<TopicPartitionReplica, ReassignPartitionsCommand.LogDirMoveState> 
states = new HashMap<>();
+
+        states.put(new TopicPartitionReplica("bar", 0, 0), new 
ReassignPartitionsCommand.CompletedMoveState("/tmp/kafka-logs0"));
+        states.put(new TopicPartitionReplica("foo", 0, 0), new 
ReassignPartitionsCommand.ActiveMoveState("/tmp/kafka-logs0",
+            "/tmp/kafka-logs1", "/tmp/kafka-logs1"));
+        states.put(new TopicPartitionReplica("foo", 1, 0), new 
ReassignPartitionsCommand.CancelledMoveState("/tmp/kafka-logs0",
+            "/tmp/kafka-logs1"));
+        states.put(new TopicPartitionReplica("quux", 0, 0), new 
ReassignPartitionsCommand.MissingReplicaMoveState("/tmp/kafka-logs1"));
+        states.put(new TopicPartitionReplica("quux", 1, 1), new 
ReassignPartitionsCommand.ActiveMoveState("/tmp/kafka-logs0",
+            "/tmp/kafka-logs1", "/tmp/kafka-logs2"));
+        states.put(new TopicPartitionReplica("quux", 2, 1), new 
ReassignPartitionsCommand.MissingLogDirMoveState("/tmp/kafka-logs1"));
+
+        assertEquals(String.join(System.lineSeparator(),
+            "Reassignment of replica bar-0-0 completed successfully.",
+            "Reassignment of replica foo-0-0 is still in progress.",
+            "Partition foo-1 on broker 0 is not being moved from log dir 
/tmp/kafka-logs0 to /tmp/kafka-logs1.",
+            "Partition quux-0 cannot be found in any live log directory on 
broker 0.",
+            "Partition quux-1 on broker 1 is being moved to log dir 
/tmp/kafka-logs2 instead of /tmp/kafka-logs1.",
+            "Partition quux-2 is not found in any live log dir on broker 1. " +
+                "There is likely an offline log directory on the broker."),
+            replicaMoveStatesToString(asScala(states)));
+    }
+
+    @Test
+    public void testGetReplicaAssignments() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+
+            Map<TopicPartition, Seq<Object>> assignments = new HashMap<>();
+
+            assignments.put(new TopicPartition("foo", 0), seq(0, 1, 2));
+            assignments.put(new TopicPartition("foo", 1), seq(1, 2, 3));
+
+            assertEquals(asScala(assignments), 
getReplicaAssignmentForTopics(adminClient, seq("foo")));
+
+            assignments.clear();
+
+            assignments.put(new TopicPartition("foo", 0), seq(0, 1, 2));
+            assignments.put(new TopicPartition("bar", 0), seq(2, 3, 0));
+
+            assertEquals(asScala(assignments),
+                getReplicaAssignmentForPartitions(adminClient, set(new 
TopicPartition("foo", 0), new TopicPartition("bar", 0))));
+        }
+    }
+
+    @Test
+    public void testGetBrokerRackInformation() {
+        try (MockAdminClient adminClient = new MockAdminClient.Builder().
+            brokers(Arrays.asList(new Node(0, "localhost", 9092, "rack0"),
+                new Node(1, "localhost", 9093, "rack1"),
+                new Node(2, "localhost", 9094, null))).
+            build()) {
+
+            assertEquals(seq(
+                new BrokerMetadata(0, Optional.of("rack0")),
+                new BrokerMetadata(1, Optional.of("rack1"))
+            ), getBrokerMetadata(adminClient, seq(0, 1), true));
+            assertEquals(seq(
+                new BrokerMetadata(0, Optional.empty()),
+                new BrokerMetadata(1, Optional.empty())
+            ), getBrokerMetadata(adminClient, seq(0, 1), false));
+            assertStartsWith("Not all brokers have rack information",
+                assertThrows(AdminOperationException.class,
+                    () -> getBrokerMetadata(adminClient, seq(1, 2), 
true)).getMessage());
+            assertEquals(seq(
+                new BrokerMetadata(1, Optional.empty()),
+                new BrokerMetadata(2, Optional.empty())
+            ), getBrokerMetadata(adminClient, seq(1, 2), false));
+        }
+    }
+
+    @Test
+    public void testParseGenerateAssignmentArgs() {
+        assertStartsWith("Broker list contains duplicate entries",
+            assertThrows(AdminCommandFailedException.class, () -> 
parseGenerateAssignmentArgs(
+                "{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", 
"1,1,2"),
+                "Expected to detect duplicate broker list 
entries").getMessage());
+        assertStartsWith("Broker list contains duplicate entries",
+            assertThrows(AdminCommandFailedException.class, () -> 
parseGenerateAssignmentArgs(
+                "{\"topics\": [{\"topic\": \"foo\"}], \"version\":1}", 
"5,2,3,4,5"),
+                "Expected to detect duplicate broker list 
entries").getMessage());
+        assertEquals(new Tuple2<>(seq(5, 2, 3, 4), seq("foo")),
+            parseGenerateAssignmentArgs("{\"topics\": [{\"topic\": \"foo\"}], 
\"version\":1}", "5,2,3,4"));
+        assertStartsWith("List of topics to reassign contains duplicate 
entries",
+            assertThrows(AdminCommandFailedException.class, () -> 
parseGenerateAssignmentArgs(
+                "{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"foo\"}], 
\"version\":1}", "5,2,3,4"),
+                "Expected to detect duplicate topic entries").getMessage());
+        assertEquals(new Tuple2<>(seq(5, 3, 4), seq("foo", "bar")),
+            parseGenerateAssignmentArgs(
+                "{\"topics\": [{\"topic\": \"foo\"},{\"topic\": \"bar\"}], 
\"version\":1}", "5,3,4"));
+    }
+
+    @Test
+    public void testGenerateAssignmentFailsWithoutEnoughReplicas() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+            assertStartsWith("Replication factor: 3 larger than available 
brokers: 2",
+                assertThrows(InvalidReplicationFactorException.class,
+                    () -> generateAssignment(adminClient, 
"{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}", "0,1", false),
+                    "Expected generateAssignment to fail").getMessage());
+        }
+    }
+
+    @Test
+    public void testGenerateAssignmentWithInvalidPartitionsFails() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(5).build()) {
+            addTopics(adminClient);
+            assertStartsWith("Topic quux not found",
+                assertThrows(ExecutionException.class,
+                    () -> generateAssignment(adminClient, 
"{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"quux\"}]}", "0,1", false),
+                    "Expected generateAssignment to 
fail").getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void testGenerateAssignmentWithInconsistentRacks() {
+        try (MockAdminClient adminClient = new MockAdminClient.Builder().
+            brokers(Arrays.asList(
+                new Node(0, "localhost", 9092, "rack0"),
+                new Node(1, "localhost", 9093, "rack0"),
+                new Node(2, "localhost", 9094, null),
+                new Node(3, "localhost", 9095, "rack1"),
+                new Node(4, "localhost", 9096, "rack1"),
+                new Node(5, "localhost", 9097, "rack2"))).
+            build()) {
+
+            addTopics(adminClient);
+            assertStartsWith("Not all brokers have rack information.",
+                assertThrows(AdminOperationException.class,
+                    () -> generateAssignment(adminClient, 
"{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", true),
+                    "Expected generateAssignment to fail").getMessage());
+            // It should succeed when --disable-rack-aware is used.
+            Tuple2<scala.collection.Map<TopicPartition, Seq<Object>>, 
scala.collection.Map<TopicPartition, Seq<Object>>>
+                proposedCurrent = generateAssignment(adminClient, 
"{\"topics\":[{\"topic\":\"foo\"}]}", "0,1,2,3", false);
+
+            Map<TopicPartition, Seq<Object>> expCurrent = new HashMap<>();
+
+            expCurrent.put(new TopicPartition("foo", 0), seq(0, 1, 2));
+            expCurrent.put(new TopicPartition("foo", 1), seq(1, 2, 3));
+
+            assertEquals(asScala(expCurrent), proposedCurrent._2());
+        }
+    }
+
+    @Test
+    public void testGenerateAssignmentWithFewerBrokers() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+            List<Integer> goalBrokers = Arrays.asList(0, 1, 3);
+
+            Tuple2<scala.collection.Map<TopicPartition, Seq<Object>>, 
scala.collection.Map<TopicPartition, Seq<Object>>>
+                proposedCurrent = generateAssignment(adminClient,
+                    "{\"topics\":[{\"topic\":\"foo\"},{\"topic\":\"bar\"}]}",
+                    
goalBrokers.stream().map(Object::toString).collect(Collectors.joining(",")), 
false);
+
+            Map<TopicPartition, Seq<Object>> expCurrent = new HashMap<>();
+
+            expCurrent.put(new TopicPartition("foo", 0), seq(0, 1, 2));
+            expCurrent.put(new TopicPartition("foo", 1), seq(1, 2, 3));
+            expCurrent.put(new TopicPartition("bar", 0), seq(2, 3, 0));
+
+            assertEquals(asScala(expCurrent), proposedCurrent._2());
+
+            // The proposed assignment should only span the provided brokers
+            proposedCurrent._1().values().foreach(replicas -> {
+                assertTrue(replicas.forall(replica -> 
goalBrokers.contains((Integer) replica)),
+                    "Proposed assignment " + proposedCurrent._1()  + " puts 
replicas on brokers other than " + goalBrokers);
+                return null;
+            });
+        }
+    }
+
+    @Test
+    public void testCurrentPartitionReplicaAssignmentToString() {
+        Map<TopicPartition, Seq<Object>> proposedParts = new HashMap<>();
+
+        proposedParts.put(new TopicPartition("foo", 1), seq(1, 2, 3));
+        proposedParts.put(new TopicPartition("bar", 0), seq(7, 8, 9));
+
+        Map<TopicPartition, Seq<Object>> currentParts = new HashMap<>();
+
+        currentParts.put(new TopicPartition("foo", 0), seq(1, 2, 3));
+        currentParts.put(new TopicPartition("foo", 1), seq(4, 5, 6));
+        currentParts.put(new TopicPartition("bar", 0), seq(7, 8));
+        currentParts.put(new TopicPartition("baz", 0), seq(10, 11, 12));
+
+        assertEquals(String.join(System.lineSeparator(),
+            "Current partition replica assignment",
+            "",
+            "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[7,8],\"log_dirs\":[\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[4,5,6],\"log_dirs\":[\"any\",\"any\",\"any\"]}]"
 +
+                "}",
+            "",
+            "Save this to use as the --reassignment-json-file option during 
rollback"),
+            currentPartitionReplicaAssignmentToString(asScala(proposedParts), 
asScala(currentParts))
+        );
+    }
+
+    @Test
+    public void testMoveMap() {
+        // overwrite foo-0 with different reassignments
+        // keep old reassignments of foo-1
+        // overwrite foo-2 with same reassignments
+        // overwrite foo-3 with new reassignments without overlap of old 
reassignments
+        // overwrite foo-4 with a subset of old reassignments
+        // overwrite foo-5 with a superset of old reassignments
+        // add new reassignments to bar-0
+        Map<TopicPartition, PartitionReassignment> currentReassignments = new 
HashMap<>();
+
+        currentReassignments.put(new TopicPartition("foo", 0), new 
PartitionReassignment(
+            Arrays.asList(1, 2, 3, 4), Arrays.asList(4), Arrays.asList(3)));
+        currentReassignments.put(new TopicPartition("foo", 1), new 
PartitionReassignment(
+            Arrays.asList(4, 5, 6, 7, 8), Arrays.asList(7, 8), 
Arrays.asList(4, 5)));
+        currentReassignments.put(new TopicPartition("foo", 2), new 
PartitionReassignment(
+            Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 
2)));
+        currentReassignments.put(new TopicPartition("foo", 3), new 
PartitionReassignment(
+            Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 
2)));
+        currentReassignments.put(new TopicPartition("foo", 4), new 
PartitionReassignment(
+            Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 
2)));
+        currentReassignments.put(new TopicPartition("foo", 5), new 
PartitionReassignment(
+            Arrays.asList(1, 2, 3, 4), Arrays.asList(3, 4), Arrays.asList(1, 
2)));
+
+        Map<TopicPartition, Seq<Object>> proposedParts = new HashMap<>();
+
+        proposedParts.put(new TopicPartition("foo", 0), seq(1, 2, 5));
+        proposedParts.put(new TopicPartition("foo", 2), seq(3, 4));
+        proposedParts.put(new TopicPartition("foo", 3), seq(5, 6));
+        proposedParts.put(new TopicPartition("foo", 4), seq(3));
+        proposedParts.put(new TopicPartition("foo", 5), seq(3, 4, 5, 6));
+        proposedParts.put(new TopicPartition("bar", 0), seq(1, 2, 3));
+
+        Map<TopicPartition, Seq<Object>> currentParts = new HashMap<>();
+
+        currentParts.put(new TopicPartition("foo", 0), seq(1, 2, 3, 4));
+        currentParts.put(new TopicPartition("foo", 1), seq(4, 5, 6, 7, 8));
+        currentParts.put(new TopicPartition("foo", 2), seq(1, 2, 3, 4));
+        currentParts.put(new TopicPartition("foo", 3), seq(1, 2, 3, 4));
+        currentParts.put(new TopicPartition("foo", 4), seq(1, 2, 3, 4));
+        currentParts.put(new TopicPartition("foo", 5), seq(1, 2, 3, 4));
+        currentParts.put(new TopicPartition("bar", 0), seq(2, 3, 4));
+        currentParts.put(new TopicPartition("baz", 0), seq(1, 2, 3));
+
+        scala.collection.mutable.Map<String, 
scala.collection.mutable.Map<Object, ReassignPartitionsCommand.PartitionMove>>
+            moveMap = calculateProposedMoveMap(asScala(currentReassignments), 
asScala(proposedParts), asScala(currentParts));
+
+        Map<Integer, ReassignPartitionsCommand.PartitionMove> fooMoves = new 
HashMap<>();
+
+        fooMoves.put(0, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2, 3), mutableSet(5)));
+        fooMoves.put(1, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(4, 5, 6), mutableSet(7, 8)));
+        fooMoves.put(2, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(3, 4)));
+        fooMoves.put(3, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(5, 6)));
+        fooMoves.put(4, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(3)));
+        fooMoves.put(5, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(1, 2), mutableSet(3, 4, 5, 
6)));
+
+        Map<Integer, ReassignPartitionsCommand.PartitionMove> barMoves = new 
HashMap<>();
+
+        barMoves.put(0, new 
ReassignPartitionsCommand.PartitionMove(mutableSet(2, 3, 4), mutableSet(1)));
+
+        assertEquals(asScala(fooMoves), moveMap.get("foo").get());
+        assertEquals(asScala(barMoves), moveMap.get("bar").get());
+
+        Map<String, String> expLeaderThrottle = new HashMap<>();
+
+        expLeaderThrottle.put("foo", 
"0:1,0:2,0:3,1:4,1:5,1:6,2:1,2:2,3:1,3:2,4:1,4:2,5:1,5:2");
+        expLeaderThrottle.put("bar", "0:2,0:3,0:4");
+
+        assertEquals(asScala(expLeaderThrottle), 
calculateLeaderThrottles(moveMap));
+
+        Map<String, String> expFollowerThrottle = new HashMap<>();
+
+        expFollowerThrottle.put("foo", 
"0:5,1:7,1:8,2:3,2:4,3:5,3:6,4:3,5:3,5:4,5:5,5:6");
+        expFollowerThrottle.put("bar", "0:1");
+
+        assertEquals(asScala(expFollowerThrottle), 
calculateFollowerThrottles(moveMap));
+
+        assertEquals(set(1, 2, 3, 4, 5, 6, 7, 8), 
calculateReassigningBrokers(moveMap));
+        assertEquals(set(0, 2), calculateMovingBrokers(set(
+            new TopicPartitionReplica("quux", 0, 0),
+            new TopicPartitionReplica("quux", 1, 2))));
+    }
+
+    @Test
+    public void testParseExecuteAssignmentArgs() {
+        assertStartsWith("Partition reassignment list cannot be empty",
+            assertThrows(AdminCommandFailedException.class,
+                () -> 
parseExecuteAssignmentArgs("{\"version\":1,\"partitions\":[]}"),
+                "Expected to detect empty partition reassignment 
list").getMessage());
+        assertStartsWith("Partition reassignment contains duplicate topic 
partitions",
+            assertThrows(AdminCommandFailedException.class, () -> 
parseExecuteAssignmentArgs(
+                "{\"version\":1,\"partitions\":" +
+                    
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
 +
+                    
"{\"topic\":\"foo\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                    "]}"), "Expected to detect a partition list with duplicate 
entries").getMessage());
+        assertStartsWith("Partition reassignment contains duplicate topic 
partitions",
+            assertThrows(AdminCommandFailedException.class, () -> 
parseExecuteAssignmentArgs(
+                "{\"version\":1,\"partitions\":" +
+                    
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"/abc\",\"/def\"]},"
 +
+                    
"{\"topic\":\"foo\",\"partition\":0,\"replicas\":[2,3],\"log_dirs\":[\"/abc\",\"/def\"]}"
 +
+                    "]}"), "Expected to detect a partition replica list with 
duplicate entries").getMessage());
+        assertStartsWith("Partition replica lists may not contain duplicate 
entries",
+            assertThrows(AdminCommandFailedException.class, () -> 
parseExecuteAssignmentArgs(
+                "{\"version\":1,\"partitions\":" +
+                    
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,0],\"log_dirs\":[\"/abc\",\"/def\"]},"
 +
+                    
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3],\"log_dirs\":[\"/abc\",\"/def\"]}"
 +
+                    "]}"), "Expected to detect a partition replica list with 
duplicate entries").getMessage());
+
+        Map<TopicPartition, Seq<Object>> partitionsToBeReassigned = new 
HashMap<>();
+
+        partitionsToBeReassigned.put(new TopicPartition("foo", 0), seq(1, 2, 
3));
+        partitionsToBeReassigned.put(new TopicPartition("foo", 1), seq(3, 4, 
5));
+
+        Tuple2<scala.collection.Map<TopicPartition, Seq<Object>>, 
scala.collection.Map<TopicPartitionReplica, String>> actual = 
parseExecuteAssignmentArgs(
+            "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]},"
 +
+                
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[3,4,5],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                "]}");
+
+        assertEquals(asScala(partitionsToBeReassigned), actual._1);
+        assertTrue(actual._2.isEmpty());
+
+        Map<TopicPartitionReplica, String> replicaAssignment = new HashMap<>();
+
+        replicaAssignment.put(new TopicPartitionReplica("foo", 0, 1), 
"/tmp/a");
+        replicaAssignment.put(new TopicPartitionReplica("foo", 0, 2), 
"/tmp/b");
+        replicaAssignment.put(new TopicPartitionReplica("foo", 0, 3), 
"/tmp/c");
+
+        actual = parseExecuteAssignmentArgs(
+            "{\"version\":1,\"partitions\":" +
+                
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[1,2,3],\"log_dirs\":[\"/tmp/a\",\"/tmp/b\",\"/tmp/c\"]}"
 +
+                "]}");
+
+        assertEquals(asScala(Collections.singletonMap(new 
TopicPartition("foo", 0), seq(1, 2, 3))), actual._1);
+        assertEquals(asScala(replicaAssignment), actual._2);
+    }
+
+    @Test
+    public void testExecuteWithInvalidPartitionsFails() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(5).build()) {
+            addTopics(adminClient);
+            assertStartsWith("Topic quux not found",
+                assertThrows(ExecutionException.class, () -> 
executeAssignment(adminClient, false,
+                    "{\"version\":1,\"partitions\":" +
+                        
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
 +
+                        
"{\"topic\":\"quux\",\"partition\":0,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                        "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected 
reassignment with non-existent topic to fail").getCause().getMessage());
+        }
+    }
+
+    @Test
+    public void testExecuteWithInvalidBrokerIdFails() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+            assertStartsWith("Unknown broker id 4",
+                assertThrows(AdminCommandFailedException.class, () -> 
executeAssignment(adminClient, false,
+                    "{\"version\":1,\"partitions\":" +
+                        
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1],\"log_dirs\":[\"any\",\"any\"]},"
 +
+                        
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+                        "]}", -1L, -1L, 10000L, Time.SYSTEM), "Expected 
reassignment with non-existent broker id to fail").getMessage());
+        }
+    }
+
+    @Test
+    public void testModifyBrokerInterBrokerThrottle() throws Exception {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            modifyInterBrokerThrottle(adminClient, set(0, 1, 2), 1000);
+            modifyInterBrokerThrottle(adminClient, set(0, 3), 100);
+            List<ConfigResource> brokers = new ArrayList<>();
+            for (int i = 0; i < 4; i++)
+                brokers.add(new ConfigResource(ConfigResource.Type.BROKER, 
Integer.toString(i)));
+            Map<ConfigResource, Config> results = 
adminClient.describeConfigs(brokers).all().get();
+            verifyBrokerThrottleResults(results.get(brokers.get(0)), 100, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(1)), 1000, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(2)), 1000, -1);
+            verifyBrokerThrottleResults(results.get(brokers.get(3)), 100, -1);
+        }
+    }
+
+    @Test
+    public void testModifyLogDirThrottle() throws Exception {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            modifyLogDirThrottle(adminClient, set(0, 1, 2), 2000);
+            modifyLogDirThrottle(adminClient, set(0, 3), -1);
+
+            List<ConfigResource> brokers = new ArrayList<>();
+            for (int i = 0; i < 4; i++)
+                brokers.add(new ConfigResource(ConfigResource.Type.BROKER, 
Integer.toString(i)));
+
+            Map<ConfigResource, Config> results = 
adminClient.describeConfigs(brokers).all().get();
+
+            verifyBrokerThrottleResults(results.get(brokers.get(0)), -1, 2000);
+            verifyBrokerThrottleResults(results.get(brokers.get(1)), -1, 2000);
+            verifyBrokerThrottleResults(results.get(brokers.get(2)), -1, 2000);
+            verifyBrokerThrottleResults(results.get(brokers.get(3)), -1, -1);
+        }
+    }
+
+    @Test
+    public void testCurReassignmentsToString() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+            assertEquals("No partition reassignments found.", 
curReassignmentsToString(adminClient));
+
+            Map<TopicPartition, Seq<Object>> reassignments = new HashMap<>();
+
+            reassignments.put(new TopicPartition("foo", 1), seq(4, 5, 3));
+            reassignments.put(new TopicPartition("foo", 0), seq(0, 1, 4, 2));
+            reassignments.put(new TopicPartition("bar", 0), seq(2, 3));
+
+            scala.collection.Map<TopicPartition, Throwable> reassignmentResult 
=
+                alterPartitionReassignments(adminClient, 
asScala(reassignments));
+
+            assertTrue(reassignmentResult.isEmpty());
+            assertEquals(String.join(System.lineSeparator(),
+                "Current partition reassignments:",
+                "bar-0: replicas: 2,3,0. removing: 0.",
+                "foo-0: replicas: 0,1,2. adding: 4.",
+                "foo-1: replicas: 1,2,3. adding: 4,5. removing: 1,2."),
+                curReassignmentsToString(adminClient));
+        }
+    }
+
+    private void verifyBrokerThrottleResults(Config config,
+                                             long expectedInterBrokerThrottle,
+                                             long 
expectedReplicaAlterLogDirsThrottle) {
+        Map<String, String> configs = new HashMap<>();
+        config.entries().forEach(entry -> configs.put(entry.name(), 
entry.value()));
+        if (expectedInterBrokerThrottle >= 0) {
+            assertEquals(Long.toString(expectedInterBrokerThrottle),
+                configs.getOrDefault(brokerLevelLeaderThrottle(), ""));
+            assertEquals(Long.toString(expectedInterBrokerThrottle),
+                configs.getOrDefault(brokerLevelFollowerThrottle(), ""));
+        }
+        if (expectedReplicaAlterLogDirsThrottle >= 0) {
+            assertEquals(Long.toString(expectedReplicaAlterLogDirsThrottle),
+                configs.getOrDefault(brokerLevelLogDirThrottle(), ""));
+        }
+    }
+
+    @Test
+    public void testModifyTopicThrottles() throws Exception {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+
+            Map<String, String> leaderThrottles = new HashMap<>();
+
+            leaderThrottles.put("foo", "leaderFoo");
+            leaderThrottles.put("bar", "leaderBar");
+
+            modifyTopicThrottles(adminClient,
+                asScala(leaderThrottles),
+                asScala(Collections.singletonMap("bar", "followerBar")));
+            List<ConfigResource> topics = Stream.of("bar", "foo").map(
+                id -> new ConfigResource(ConfigResource.Type.TOPIC, 
id)).collect(Collectors.toList());
+            Map<ConfigResource, Config> results = 
adminClient.describeConfigs(topics).all().get();
+            verifyTopicThrottleResults(results.get(topics.get(0)), 
"leaderBar", "followerBar");
+            verifyTopicThrottleResults(results.get(topics.get(1)), 
"leaderFoo", "");
+        }
+    }
+
+    private void verifyTopicThrottleResults(Config config,
+                                            String expectedLeaderThrottle,
+                                            String expectedFollowerThrottle) {
+        Map<String, String> configs = new HashMap<>();
+        config.entries().forEach(entry -> configs.put(entry.name(), 
entry.value()));
+        assertEquals(expectedLeaderThrottle,
+            configs.getOrDefault(topicLevelLeaderThrottle(), ""));
+        assertEquals(expectedFollowerThrottle,
+            configs.getOrDefault(topicLevelFollowerThrottle(), ""));
+    }
+
+    @Test
+    public void testAlterReplicaLogDirs() {
+        try (MockAdminClient adminClient = new MockAdminClient.Builder().
+            numBrokers(4).
+            brokerLogDirs(Collections.nCopies(4,
+                Arrays.asList("/tmp/kafka-logs0", "/tmp/kafka-logs1"))).
+            build()) {
+
+            addTopics(adminClient);
+
+            Map<TopicPartitionReplica, String> assignment = new HashMap<>();
+
+            assignment.put(new TopicPartitionReplica("foo", 0, 0), 
"/tmp/kafka-logs1");
+            assignment.put(new TopicPartitionReplica("quux", 1, 0), 
"/tmp/kafka-logs1");
+
+            assertEquals(
+                set(new TopicPartitionReplica("foo", 0, 0)),
+                alterReplicaLogDirs(adminClient, asScala(assignment))
+            );
+        }
+    }
+
+    public void assertStartsWith(String prefix, String str) {
+        assertTrue(str.startsWith(prefix), String.format("Expected the string 
to start with %s, but it was %s", prefix, str));
+    }
+
+    @Test
+    public void testPropagateInvalidJsonError() {
+        try (MockAdminClient adminClient = new 
MockAdminClient.Builder().numBrokers(4).build()) {
+            addTopics(adminClient);
+            assertStartsWith("Unexpected character",
+                assertThrows(AdminOperationException.class, () -> 
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, 
Time.SYSTEM)).getMessage());
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <T> scala.collection.immutable.Set<T> set(final T... set) {
+        return mutableSet(set).toSet();
+    }
+
+    @SuppressWarnings({"deprecation", "unchecked"})
+    private static <T> scala.collection.mutable.Set<T> mutableSet(final 
T...set) {
+        return JavaConverters.asScalaSet(new HashSet<>(Arrays.asList(set)));
+    }
+
+    @SuppressWarnings({"deprecation", "unchecked"})
+    private static <T> Seq<T> seq(T... seq) {
+        return 
JavaConverters.asScalaIteratorConverter(Arrays.asList(seq).iterator()).asScala().toSeq();
+    }
+
+    @SuppressWarnings("deprecation")
+    private static <K, V> scala.collection.Map<K, V> asScala(Map<K, V> jmap) {
+        return JavaConverters.mapAsScalaMap(jmap);
+    }
+}

Reply via email to