This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.2 by this push:
     new c90e97ac1fd KAFKA-20023 Fix kafka-reassign-partitions.sh to handle 
dead brokers (#21222)
c90e97ac1fd is described below

commit c90e97ac1fdda7dab4ceeb8771b099193b7d51ab
Author: Ken Huang <[email protected]>
AuthorDate: Sun Jan 4 00:34:26 2026 +0800

    KAFKA-20023 Fix kafka-reassign-partitions.sh to handle dead brokers (#21222)
    
    Return Node objects instead and use Node#isEmpty() to filter them out
    before querying.
    
    Test Result:
    Broker 4 is alive
    ```
    ./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092
    --topics-to-move-json-file tmp.json   --broker-list "2,3,4"   --generate
    Current partition replica assignment
    
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,4],"log_dirs":["/tmp/kraft-broker-logs-1","/tmp/kraft-broker-logs-2"]},{"topic":"test1","partition":1,"replicas":[4,2],"log_dirs":["/tmp/kraft-broker-logs-2","/tmp/kraft-broker-logs"]}]}
    ```
    Broker 4 shutdown
    ```
    /bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092
    --topics-to-move-json-file tmp.json   --broker-list "2,3,4"   --generate
    Current partition replica assignment
    
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,4],"log_dirs":["/tmp/kraft-broker-logs-1","any"]},{"topic":"test1","partition":1,"replicas":[4,2],"log_dirs":["any","/tmp/kraft-broker-logs"]}]}
    
    Proposed partition reassignment configuration
    
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]}
    
    ```
    
    Reviewers: PoAn Yang <[email protected]>, Andrew Schofield
     <[email protected]>, Chia-Ping Tsai <[email protected]>
---
 .../tools/reassign/ReassignPartitionsCommand.java  | 115 +++++++++++++++------
 .../reassign/ReassignPartitionsCommandTest.java    |  43 ++++++++
 .../tools/reassign/ReassignPartitionsUnitTest.java |  58 +++++++----
 3 files changed, 164 insertions(+), 52 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 8e1f6114c24..1933e4e3526 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -565,15 +565,16 @@ public class ReassignPartitionsCommand {
         List<Integer> brokersToReassign = t0.getKey();
         List<String> topicsToReassign = t0.getValue();
 
-        Map<TopicPartition, List<Integer>> currentAssignments = 
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
+        Map<TopicPartition, List<Node>> currentAssignments = 
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
         Map<TopicPartitionReplica, String> currentReplicaLogDirs = 
getReplicaToLogDir(adminClient, currentAssignments);
         List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, 
brokersToReassign, enableRackAwareness);
-        Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentAssignments, usableBrokers);
+        Map<TopicPartition, List<Integer>> currentParts = 
toReplicaIds(currentAssignments);
+        Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentParts, usableBrokers);
         System.out.printf("Current partition replica assignment%n%s%n%n",
-            formatAsReassignmentJson(currentAssignments, 
currentReplicaLogDirs));
+            formatAsReassignmentJson(currentParts, currentReplicaLogDirs));
         System.out.printf("Proposed partition reassignment 
configuration%n%s%n",
             formatAsReassignmentJson(proposedAssignments, Map.of()));
-        return Map.entry(proposedAssignments, currentAssignments);
+        return Map.entry(proposedAssignments, currentParts);
     }
 
     /**
@@ -642,14 +643,14 @@ public class ReassignPartitionsCommand {
      * @return                A map from partitions to broker assignments.
      *                        If any topic can't be found, an exception will 
be thrown.
      */
-    static Map<TopicPartition, List<Integer>> 
getReplicaAssignmentForTopics(Admin adminClient,
-                                                                            
List<String> topics
+    static Map<TopicPartition, List<Node>> getReplicaAssignmentForTopics(Admin 
adminClient,
+                                                                         
List<String> topics
     ) throws ExecutionException, InterruptedException {
-        Map<TopicPartition, List<Integer>> res = new HashMap<>();
+        Map<TopicPartition, List<Node>> res = new HashMap<>();
         describeTopics(adminClient, new HashSet<>(topics)).forEach((topicName, 
topicDescription) ->
             topicDescription.partitions().forEach(info -> res.put(
                 new TopicPartition(topicName, info.partition()),
-                
info.replicas().stream().map(Node::id).collect(Collectors.toList())
+                info.replicas()
             )
         ));
         return res;
@@ -663,15 +664,15 @@ public class ReassignPartitionsCommand {
      * @return                A map from partitions to broker assignments.
      *                        If any topic or partition can't be found, an 
exception will be thrown.
      */
-    static Map<TopicPartition, List<Integer>> 
getReplicaAssignmentForPartitions(Admin adminClient,
-                                                                               
 Set<TopicPartition> partitions
+    static Map<TopicPartition, List<Node>> getReplicasForPartitions(Admin 
adminClient,
+                                                                    
Set<TopicPartition> partitions
     ) throws ExecutionException, InterruptedException {
-        Map<TopicPartition, List<Integer>> res = new HashMap<>();
+        Map<TopicPartition, List<Node>> res = new HashMap<>();
         describeTopics(adminClient, 
partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet())).forEach((topicName,
 topicDescription) ->
             topicDescription.partitions().forEach(info -> {
                 TopicPartition tp = new TopicPartition(topicName, 
info.partition());
                 if (partitions.contains(tp))
-                    res.put(tp, 
info.replicas().stream().map(Node::id).collect(Collectors.toList()));
+                    res.put(tp, info.replicas());
             })
         );
 
@@ -684,6 +685,17 @@ public class ReassignPartitionsCommand {
         return res;
     }
 
+    static Map<TopicPartition, List<Integer>> toReplicaIds(
+        Map<TopicPartition, List<Node>> replicaAssignmentForPartitions
+    ) {
+        return replicaAssignmentForPartitions.entrySet()
+                .stream()
+                .collect(Collectors.toMap(
+                    Entry::getKey,
+                    e -> 
e.getValue().stream().map(Node::id).collect(Collectors.toList())
+                ));
+    }
+
     /**
      * Find the rack information for some brokers.
      *
@@ -775,8 +787,10 @@ public class ReassignPartitionsCommand {
         proposedParts.values().forEach(brokers::addAll);
 
         verifyBrokerIds(adminClient, brokers);
-        Map<TopicPartition, List<Integer>> currentParts = 
getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
-        
System.out.println(currentPartitionReplicaAssignmentToString(adminClient, 
proposedParts, currentParts));
+        Map<TopicPartition, List<Node>> currentPartsToNode = 
getReplicasForPartitions(adminClient, proposedParts.keySet());
+        Map<TopicPartition, List<Integer>> currentParts = 
toReplicaIds(currentPartsToNode);
+
+        
System.out.println(currentPartitionReplicaAssignmentToString(adminClient, 
proposedParts, currentPartsToNode));
 
         if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
             System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
@@ -919,21 +933,31 @@ public class ReassignPartitionsCommand {
      *
      * @param adminClient                 The admin client object to use.
      * @param proposedParts               The proposed partition assignment.
-     * @param currentParts                The current partition assignment.
+     * @param currentAssignments          The current partition assignment 
with Node information.
      *
      * @return                            The string to print.  We will only 
print information about
      *                                    partitions that appear in the 
proposed partition assignment.
      */
-    static String currentPartitionReplicaAssignmentToString(Admin adminClient,
-                                                            
Map<TopicPartition, List<Integer>> proposedParts,
-                                                            
Map<TopicPartition, List<Integer>> currentParts) throws 
JsonProcessingException, ExecutionException, InterruptedException {
-        Map<TopicPartition, List<Integer>> partitionsToBeReassigned = 
currentParts.entrySet().stream()
+    static String currentPartitionReplicaAssignmentToString(
+        Admin adminClient,
+        Map<TopicPartition, List<Integer>> proposedParts,
+        Map<TopicPartition, List<Node>> currentAssignments
+    ) throws JsonProcessingException, ExecutionException, InterruptedException 
{
+
+        Map<TopicPartition, List<Node>> partitionsToBeReassigned = 
currentAssignments.entrySet()
+            .stream()
             .filter(e -> proposedParts.containsKey(e.getKey()))
             .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
-        Map<TopicPartitionReplica, String> currentReplicaLogDirs = 
getReplicaToLogDir(adminClient, partitionsToBeReassigned);
+
+        Map<TopicPartitionReplica, String> currentReplicaLogDirs = 
getReplicaToLogDir(
+            adminClient,
+            partitionsToBeReassigned
+        );
+
+        Map<TopicPartition, List<Integer>> currentParts = 
toReplicaIds(partitionsToBeReassigned);
 
         return String.format("Current partition replica 
assignment%n%n%s%n%nSave this to use as the %s",
-            formatAsReassignmentJson(partitionsToBeReassigned, 
currentReplicaLogDirs),
+            formatAsReassignmentJson(currentParts, currentReplicaLogDirs),
             "--reassignment-json-file option during rollback");
     }
 
@@ -1519,25 +1543,48 @@ public class ReassignPartitionsCommand {
         return results;
     }
 
+    /**
+     * Get the log directory for each replica.
+     *
+     * @param adminClient The admin client object to use.
+     * @param current The current partition assignment with Node information.
+     * @return Map of TopicPartitionReplica to log directory path.
+     */
     static Map<TopicPartitionReplica, String> getReplicaToLogDir(
         Admin adminClient,
-        Map<TopicPartition, List<Integer>> topicPartitionToReplicas
-    ) throws InterruptedException, ExecutionException {
-        var replicaLogDirs = topicPartitionToReplicas
-                .entrySet()
-                .stream()
-                .flatMap(entry -> entry.getValue()
-                    .stream()
-                    .map(id -> new 
TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id)))
-                .collect(Collectors.toUnmodifiableSet());
+        Map<TopicPartition, List<Node>> current
+    ) throws ExecutionException, InterruptedException {
+        List<TopicPartitionReplica> availableReplicas = available(current);
+
+        if (availableReplicas.isEmpty()) {
+            return Map.of();
+        }
 
-        return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get()
+        return 
adminClient.describeReplicaLogDirs(availableReplicas).all().get()
                 .entrySet()
                 .stream()
-                .filter(entry -> entry.getValue().getCurrentReplicaLogDir() != 
null)
+                .filter(e -> e.getValue().getCurrentReplicaLogDir() != null)
                 .collect(Collectors.toMap(
                     Entry::getKey,
-                    entry -> entry.getValue().getCurrentReplicaLogDir()
-                ));
+                    e -> e.getValue().getCurrentReplicaLogDir())
+                );
+    }
+
+    /**
+     * Extract available (non-empty) replicas from the assignment.
+     */
+    private static List<TopicPartitionReplica> available(Map<TopicPartition, 
List<Node>> current) {
+        return current.entrySet()
+                .stream()
+                .flatMap(entry -> entry.getValue()
+                    .stream()
+                    .filter(node -> !node.isEmpty())
+                    .map(node -> new TopicPartitionReplica(
+                        entry.getKey().topic(),
+                        entry.getKey().partition(),
+                        node.id()
+                    ))
+                )
+                .toList();
     }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index a8d830b4bd4..a71e4779a3e 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -502,6 +502,49 @@ public class ReassignPartitionsCommandTest {
         }
     }
 
+    @ClusterTest(types = {Type.KRAFT})
+    public void 
testGenerateAssignmentWithOneBootstrapServerShutdownWontTimeout() throws 
Exception {
+        var brokerIdToShutdown = 0;
+        createTopics();
+        var foo0 = new TopicPartition("foo", 0);
+        produceMessages(foo0.topic(), foo0.partition(), 100);
+
+        try (Admin admin = 
Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers()))) {
+            String topicsToMoveJson = """
+                    {
+                        "topics": [
+                            { "topic": "foo" }
+                        ],
+                        "version": 1
+                    }
+                    """;
+            clusterInstance.shutdownBroker(brokerIdToShutdown);
+            TestUtils.waitForCondition(
+                    () -> clusterInstance.aliveBrokers().size() == 4,
+                    "Waiting for broker to shutdown failed"
+            );
+            generateAssignment(admin, topicsToMoveJson, "1,2,3", false);
+        }
+    }
+    
+    @ClusterTest(types = {Type.KRAFT})
+    public void 
testExecuteAssignmentWithOneBootstrapServerShutdownWontTimeout() throws 
Exception {
+        var brokerIdToShutdown = 0;
+        createTopics();
+        var foo0 = new TopicPartition("foo", 0);
+        produceMessages(foo0.topic(), foo0.partition(), 100);
+        clusterInstance.shutdownBroker(brokerIdToShutdown);
+        TestUtils.waitForCondition(
+            () -> clusterInstance.aliveBrokers().size() == 4,
+            "Waiting for broker to shutdown failed"
+        );
+        // Execute the assignment
+        String assignment = "{\"version\":1,\"partitions\":" +
+            
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
 +
+            "]}";
+        runExecuteAssignment(false, assignment, -1L, -1L);
+    }
+    
     private void createTopics() {
         try (Admin admin = 
Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
clusterInstance.bootstrapServers()))) {
             Map<Integer, List<Integer>> fooReplicasAssignments = new 
HashMap<>();
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index 22e9011a2c2..d953ccf9ea7 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -69,7 +69,6 @@ import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findLogD
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findPartitionReassignmentStates;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
-import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
@@ -79,6 +78,7 @@ import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseExe
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseGenerateAssignmentArgs;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitionReassignmentStatesToString;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
+import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.toReplicaIds;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -290,20 +290,30 @@ public class ReassignPartitionsUnitTest {
             assignments.put(new TopicPartition("foo", 0), List.of(0, 1, 2));
             assignments.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
 
-            assertEquals(assignments, 
getReplicaAssignmentForTopics(adminClient, List.of("foo")));
+            assertEquals(
+                assignments,
+                toReplicaIds(getReplicaAssignmentForTopics(adminClient, 
List.of("foo")))
+            );
 
             assignments.clear();
 
             assignments.put(new TopicPartition("foo", 0), List.of(0, 1, 2));
             assignments.put(new TopicPartition("bar", 0), List.of(2, 3, 0));
 
-            assertEquals(assignments,
-                getReplicaAssignmentForPartitions(adminClient, Set.of(new 
TopicPartition("foo", 0), new TopicPartition("bar", 0))));
+            Map<TopicPartition, List<Integer>> actualAssignments = 
toReplicaIds(
+                ReassignPartitionsCommand.getReplicasForPartitions(
+                    adminClient,
+                    Set.of(new TopicPartition("foo", 0), new 
TopicPartition("bar", 0))
+            ));
+            assertEquals(
+                assignments,
+                actualAssignments
+            );
 
             UnknownTopicOrPartitionException exception =
                 assertInstanceOf(UnknownTopicOrPartitionException.class,
                     assertThrows(ExecutionException.class,
-                        () -> getReplicaAssignmentForPartitions(adminClient,
+                        () -> 
ReassignPartitionsCommand.getReplicasForPartitions(adminClient,
                             Set.of(new TopicPartition("foo", 0), new 
TopicPartition("foo", 10)))).getCause());
             assertEquals("Unable to find partition: foo-10", 
exception.getMessage());
         }
@@ -451,25 +461,31 @@ public class ReassignPartitionsUnitTest {
         ) {
 
             List<Node> brokers = adminClient.brokers();
+            Node broker1 = brokers.get(1);
+            Node broker2 = brokers.get(2);
+            Node broker3 = brokers.get(3);
+            Node broker4 = brokers.get(4);
+            Node broker5 = brokers.get(5);
+
             adminClient.addTopic(false, "foo", List.of(
-                new TopicPartitionInfo(1, brokers.get(1),
-                    List.of(brokers.get(1), brokers.get(2), brokers.get(3)),
-                    List.of(brokers.get(1), brokers.get(2), brokers.get(3)))
+                new TopicPartitionInfo(1, broker1,
+                    List.of(broker1, broker2, broker3),
+                    List.of(broker1, broker2, broker3))
             ), Map.of());
 
             adminClient.addTopic(false, "bar", List.of(
-                new TopicPartitionInfo(0, brokers.get(4),
-                    List.of(brokers.get(4), brokers.get(5)),
-                    List.of(brokers.get(4), brokers.get(5)))
+                new TopicPartitionInfo(0, broker4,
+                    List.of(broker4, broker5),
+                    List.of(broker4, broker5))
             ), Map.of());
 
             Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
             proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2));
             proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5));
 
-            Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
-            currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
-            currentParts.put(new TopicPartition("bar", 0), List.of(4, 5));
+            Map<TopicPartition, List<Node>> currentParts = new HashMap<>();
+            currentParts.put(new TopicPartition("foo", 1), List.of(broker1, 
broker2, broker3));
+            currentParts.put(new TopicPartition("bar", 0), List.of(broker4, 
broker5));
 
             assertEquals(String.join(System.lineSeparator(),
                 "Current partition replica assignment",
@@ -801,10 +817,16 @@ public class ReassignPartitionsUnitTest {
         ) {
             addTopics(adminClient);
 
-            Map<TopicPartition, List<Integer>> topicPartitionToReplicas = 
Map.of(
-                new TopicPartition("foo", 0), List.of(0, 1, 2),
-                new TopicPartition("foo", 1), List.of(1, 2, 3),
-                new TopicPartition("bar", 0), List.of(2, 3, 0)
+            List<Node> brokers = adminClient.brokers();
+            Node broker0 = brokers.get(0);
+            Node broker1 = brokers.get(1);
+            Node broker2 = brokers.get(2);
+            Node broker3 = brokers.get(3);
+
+            Map<TopicPartition, List<Node>> topicPartitionToReplicas = Map.of(
+                new TopicPartition("foo", 0), List.of(broker0, broker1, 
broker2),
+                new TopicPartition("foo", 1), List.of(broker1, broker2, 
broker3),
+                new TopicPartition("bar", 0), List.of(broker2, broker3, 
broker0)
             );
 
             Map<TopicPartitionReplica, String> result = 
getReplicaToLogDir(adminClient, topicPartitionToReplicas);

Reply via email to