This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 51f2c7b2b65 MINOR: fix reassign command bug (#20003)
51f2c7b2b65 is described below

commit 51f2c7b2b650ab24f4dcf557e32f691b73cc71d0
Author: Lan Ding <isdin...@163.com>
AuthorDate: Wed Jun 25 02:34:13 2025 +0800

    MINOR: fix reassign command bug (#20003)
    
    see
    
    
https://github.com/apache/kafka/blob/9570c67b8c4ed1a4c3888511adad58d9b3a8bc0f/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala#L1208
    During the rewrite for
    [KAFKA-14595](https://github.com/apache/kafka/pull/13247), the relevant
    condition was omitted.
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../tools/reassign/ReassignPartitionsCommand.java  |  4 +--
 .../reassign/ReassignPartitionsCommandTest.java    | 38 +++++++++++++++++++++-
 2 files changed, 39 insertions(+), 3 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 72c49410e13..4893bb25830 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -1271,7 +1271,7 @@ public class ReassignPartitionsCommand {
         Set<TopicPartition> targetPartsSet = targetParts.stream().map(t -> 
t.getKey()).collect(Collectors.toSet());
         Set<TopicPartition> curReassigningParts = new HashSet<>();
         
adminClient.listPartitionReassignments(targetPartsSet).reassignments().get().forEach((part,
 reassignment) -> {
-            if (reassignment.addingReplicas().isEmpty() || 
!reassignment.removingReplicas().isEmpty())
+            if (!reassignment.addingReplicas().isEmpty() || 
!reassignment.removingReplicas().isEmpty())
                 curReassigningParts.add(part);
         });
         if (!curReassigningParts.isEmpty()) {
@@ -1440,7 +1440,7 @@ public class ReassignPartitionsCommand {
         }
 
         OptionSpec<?> action = allActions.get(0);
-        
+
         if (opts.options.has(opts.bootstrapServerOpt) && 
opts.options.has(opts.bootstrapControllerOpt))
             CommandLineUtils.printUsageAndExit(opts.parser, "Please don't 
specify both --bootstrap-server and --bootstrap-controller");
         else if (!opts.options.has(opts.bootstrapServerOpt) && 
!opts.options.has(opts.bootstrapControllerOpt))
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index 1ee55c6eace..069a64234c6 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -316,7 +316,7 @@ public class ReassignPartitionsCommandTest {
     }
 
     @ClusterTest
-    public void testCancellationWithAddingReplicaInIsr() throws Exception {
+    public void testCancellationWithAddingAndRemovingReplicaInIsr() throws 
Exception {
         createTopics();
         TopicPartition foo0 = new TopicPartition("foo", 0);
         produceMessages(foo0.topic(), foo0.partition(), 200);
@@ -351,6 +351,42 @@ public class ReassignPartitionsCommandTest {
         verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), 
foo0.partition(), 4));
     }
 
+    @ClusterTest
+    public void testCancellationWithAddingReplicaInIsr() throws Exception {
+        createTopics();
+        TopicPartition foo0 = new TopicPartition("foo", 0);
+        produceMessages(foo0.topic(), foo0.partition(), 200);
+
+        // The reassignment will bring replicas 3 and 4 into the replica set.
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[0,1,2,3,4],\"log_dirs\":[\"any\",\"any\",\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+
+        // We will throttle replica 4 so that only replica 3 joins the ISR
+        setReplicationThrottleForPartitions(foo0);
+
+        // Execute the assignment and wait for replica 3 (only) to join the ISR
+        runExecuteAssignment(false, assignment, -1L, -1L);
+        try (Admin admin = 
Admin.create(Collections.singletonMap(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
 clusterInstance.bootstrapServers()))) {
+            TestUtils.waitForCondition(
+                () -> {
+                    Set<Integer> isr = 
admin.describeTopics(Collections.singleton(foo0.topic()))
+                        
.allTopicNames().get().get(foo0.topic()).partitions().stream()
+                        .filter(p -> p.partition() == foo0.partition())
+                        .flatMap(p -> p.isr().stream())
+                        .map(Node::id).collect(Collectors.toSet());
+                    return isr.containsAll(Arrays.asList(0, 1, 2, 3));
+                },
+                "Timed out while waiting for replica 3 to join the ISR"
+            );
+        }
+
+        // Now cancel the assignment and verify that the partition is removed 
from cancelled replicas
+        assertEquals(new AbstractMap.SimpleImmutableEntry<>(singleton(foo0), 
Collections.emptySet()), runCancelAssignment(assignment, true, true));
+        verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), 
foo0.partition(), 3));
+        verifyReplicaDeleted(new TopicPartitionReplica(foo0.topic(), 
foo0.partition(), 4));
+    }
+
     /**
      * Test moving partitions between directories.
      */

Reply via email to