This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 51f2c7b2b65 MINOR: fix reassign command bug (#20003) 51f2c7b2b65 is described below commit 51f2c7b2b650ab24f4dcf557e32f691b73cc71d0 Author: Lan Ding <isdin...@163.com> AuthorDate: Wed Jun 25 02:34:13 2025 +0800 MINOR: fix reassign command bug (#20003) see https://github.com/apache/kafka/blob/9570c67b8c4ed1a4c3888511adad58d9b3a8bc0f/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L1208 During the rewrite for [KAFKA-14595](https://github.com/apache/kafka/pull/13247), the relevant condition was omitted. Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../tools/reassign/ReassignPartitionsCommand.java | 4 +-- .../reassign/ReassignPartitionsCommandTest.java | 38 +++++++++++++++++++++- 2 files changed, 39 insertions(+), 3 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 72c49410e13..4893bb25830 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -1271,7 +1271,7 @@ public class ReassignPartitionsCommand { Set<TopicPartition> targetPartsSet = targetParts.stream().map(t -> t.getKey()).collect(Collectors.toSet()); Set<TopicPartition> curReassigningParts = new HashSet<>(); adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part, reassignment) -> { - if (reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) + if (!reassignment.addingReplicas().isEmpty() || !reassignment.removingReplicas().isEmpty()) curReassigningParts.add(part); }); if (!curReassigningParts.isEmpty()) { @@ -1440,7 +1440,7 @@ public class ReassignPartitionsCommand { } OptionSpec<?> action = allActions.get(0); - + if (opts.options.has(opts.bootstrapServerOpt) && opts.options.has(opts.bootstrapControllerOpt)) CommandLineUtils.printUsageAndExit(opts.parser, "Please don't specify both --bootstrap-server and --bootstrap-controller"); else if (!opts.options.has(opts.bootstrapServerOpt) && !opts.options.has(opts.bootstrapControllerOpt)) diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java index 1ee55c6eace..069a64234c6 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java @@ -316,7 +316,7 @@ public class ReassignPartitionsCommandTest { } @ClusterTest - public void testCancellationWithAddingReplicaInIsr() throws Exception { + public void testCancellationWithAddingAndRemovingReplicaInIsr() throws Exception { createTopics(); TopicPartition foo0 = new TopicPartition("foo", 0); produceMessages(foo0.topic(), foo0.partition(), 200); @@ -351,6 +351,42 @@ public class ReassignPartitionsCommandTest { verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); } + @ClusterTest + public void testCancellationWithAddingReplicaInIsr() throws Exception { + createTopics(); + TopicPartition foo0 = new TopicPartition("foo", 0); + produceMessages(foo0.topic(), foo0.partition(), 200); + + // The reassignment will bring replicas 3 and 4 into the replica set. + String assignment = "{\"version\":1,\"partitions\":" + + "[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}" + + "]}"; + + // We will throttle replica 4 so that only replica 3 joins the ISR + setReplicationThrottleForPartitions(foo0); + + // Execute the assignment and wait for replica 3 (only) to join the ISR + runExecuteAssignment(false, assignment, -1L, -1L); + try (Admin admin = Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, clusterInstance.bootstrapServers()))) { + TestUtils.waitForCondition( + () -> { + Set<Integer> isr = admin.describeTopics(Collections.singleton(foo0.topic())) + .allTopicNames().get().get(foo0.topic()).partitions().stream() + .filter(p -> p.partition() == foo0.partition()) + .flatMap(p -> p.isr().stream()) + .map(Node::id).collect(Collectors.toSet()); + return isr.containsAll(Arrays.asList(0, 1, 2, 3)); + }, + "Timed out while waiting for replica 3 to join the ISR" + ); + } + + // Now cancel the assignment and verify that the partition is removed from cancelled replicas + assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), Collections.emptySet()), runCancelAssignment(assignment, true, true)); + verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 3)); + verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), foo0.partition(), 4)); + } + /** * Test moving partitions between directories. */