This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 47bb46c10dc KAFKA-19582 the current assignments shown by 
ReassignPartitionsCommand should include the log directories (#20319)
47bb46c10dc is described below

commit 47bb46c10dc82dc6ba584ca1659400d899f92893
Author: Ken Huang <s7133...@gmail.com>
AuthorDate: Sat Aug 23 02:57:00 2025 +0800

    KAFKA-19582 the current assignments shown by ReassignPartitionsCommand 
should include the log directories (#20319)
    
    The ReassignPartitionsCommand shows the topic replicas on each broker.
    When using the --generate command, it returns the current partition
    replica assignment. However, the log directory for each current replica
    is always shown as any. This makes it impossible for users to determine
    which specific log directory is being used by each replica. Therefore,
    we should fix this behavior.
    
    ```
    Current partition replica assignment
    {
      "version": 1,
      "partitions": [
        {
          "topic": "test1",
          "partition": 0,
          "replicas": [
            4,
            2
          ],
          "log_dirs": [
            "any",
            "any"
          ]
        }
      ]
    }
    ```
    
    This PR
    ```
    Current partition replica assignment
    {
      "version": 1,
      "partitions": [
        {
          "topic": "test1",
          "partition": 0,
          "replicas": [
            4,
            2
          ],
          "log_dirs": [
            "/tmp/kraft-broker-logs234",
            "/tmp/kraft-broker-logs"
          ]
        }
      ]
    }
    ```
    
    Reviewers: PoAn Yang <pay...@apache.org>, Jhen-Yung Hsu
     <jhenyung...@gmail.com>, TaiJuWu <tjwu1...@gmail.com>, Chia-Ping Tsai
     <chia7...@gmail.com>
---
 .../tools/reassign/ReassignPartitionsCommand.java  |  36 ++++++--
 .../tools/reassign/ReassignPartitionsUnitTest.java | 100 ++++++++++++++++-----
 2 files changed, 109 insertions(+), 27 deletions(-)

diff --git 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 4628c34fe18..8e1f6114c24 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -566,10 +566,11 @@ public class ReassignPartitionsCommand {
         List<String> topicsToReassign = t0.getValue();
 
         Map<TopicPartition, List<Integer>> currentAssignments = 
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
+        Map<TopicPartitionReplica, String> currentReplicaLogDirs = 
getReplicaToLogDir(adminClient, currentAssignments);
         List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, 
brokersToReassign, enableRackAwareness);
         Map<TopicPartition, List<Integer>> proposedAssignments = 
calculateAssignment(currentAssignments, usableBrokers);
         System.out.printf("Current partition replica assignment%n%s%n%n",
-            formatAsReassignmentJson(currentAssignments, Map.of()));
+            formatAsReassignmentJson(currentAssignments, 
currentReplicaLogDirs));
         System.out.printf("Proposed partition reassignment 
configuration%n%s%n",
             formatAsReassignmentJson(proposedAssignments, Map.of()));
         return Map.entry(proposedAssignments, currentAssignments);
@@ -775,7 +776,7 @@ public class ReassignPartitionsCommand {
 
         verifyBrokerIds(adminClient, brokers);
         Map<TopicPartition, List<Integer>> currentParts = 
getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
-        
System.out.println(currentPartitionReplicaAssignmentToString(proposedParts, 
currentParts));
+        
System.out.println(currentPartitionReplicaAssignmentToString(adminClient, 
proposedParts, currentParts));
 
         if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
             System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
@@ -916,20 +917,23 @@ public class ReassignPartitionsCommand {
     /**
      * Return the string which we want to print to describe the current 
partition assignment.
      *
+     * @param adminClient                 The admin client object to use.
      * @param proposedParts               The proposed partition assignment.
      * @param currentParts                The current partition assignment.
      *
      * @return                            The string to print.  We will only 
print information about
      *                                    partitions that appear in the 
proposed partition assignment.
      */
-    static String 
currentPartitionReplicaAssignmentToString(Map<TopicPartition, List<Integer>> 
proposedParts,
-                                                            
Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException 
{
+    static String currentPartitionReplicaAssignmentToString(Admin adminClient,
+                                                            
Map<TopicPartition, List<Integer>> proposedParts,
+                                                            
Map<TopicPartition, List<Integer>> currentParts) throws 
JsonProcessingException, ExecutionException, InterruptedException {
         Map<TopicPartition, List<Integer>> partitionsToBeReassigned = 
currentParts.entrySet().stream()
             .filter(e -> proposedParts.containsKey(e.getKey()))
             .collect(Collectors.toMap(Entry::getKey, Entry::getValue));
+        Map<TopicPartitionReplica, String> currentReplicaLogDirs = 
getReplicaToLogDir(adminClient, partitionsToBeReassigned);
 
         return String.format("Current partition replica 
assignment%n%n%s%n%nSave this to use as the %s",
-            formatAsReassignmentJson(partitionsToBeReassigned, Map.of()),
+            formatAsReassignmentJson(partitionsToBeReassigned, 
currentReplicaLogDirs),
             "--reassignment-json-file option during rollback");
     }
 
@@ -1514,4 +1518,26 @@ public class ReassignPartitionsCommand {
         }
         return results;
     }
+
+    static Map<TopicPartitionReplica, String> getReplicaToLogDir(
+        Admin adminClient,
+        Map<TopicPartition, List<Integer>> topicPartitionToReplicas
+    ) throws InterruptedException, ExecutionException {
+        var replicaLogDirs = topicPartitionToReplicas
+                .entrySet()
+                .stream()
+                .flatMap(entry -> entry.getValue()
+                    .stream()
+                    .map(id -> new 
TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id)))
+                .collect(Collectors.toUnmodifiableSet());
+
+        return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get()
+                .entrySet()
+                .stream()
+                .filter(entry -> entry.getValue().getCurrentReplicaLogDir() != 
null)
+                .collect(Collectors.toMap(
+                    Entry::getKey,
+                    entry -> entry.getValue().getCurrentReplicaLogDir()
+                ));
+    }
 }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index 949b595a115..22e9011a2c2 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -71,6 +71,7 @@ import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generate
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
+import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyLogDirThrottle;
 import static 
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles;
@@ -436,29 +437,50 @@ public class ReassignPartitionsUnitTest {
 
     @Test
     public void testCurrentPartitionReplicaAssignmentToString() throws 
Exception {
-        Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
-
-        proposedParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
-        proposedParts.put(new TopicPartition("bar", 0), List.of(7, 8, 9));
-
-        Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
-
-        currentParts.put(new TopicPartition("foo", 0), List.of(1, 2, 3));
-        currentParts.put(new TopicPartition("foo", 1), List.of(4, 5, 6));
-        currentParts.put(new TopicPartition("bar", 0), List.of(7, 8));
-        currentParts.put(new TopicPartition("baz", 0), List.of(10, 11, 12));
+        try (MockAdminClient adminClient = new MockAdminClient.Builder()
+                .numBrokers(6)
+                .brokerLogDirs(List.of(
+                    List.of("/tmp/broker0/logs"),
+                    List.of("/tmp/broker1/logs"),
+                    List.of("/tmp/broker2/logs"),
+                    List.of("/tmp/broker3/logs"),
+                    List.of("/tmp/broker4/logs"),
+                    List.of("/tmp/broker5/logs")
+                ))
+                .build()
+        ) {
+
+            List<Node> brokers = adminClient.brokers();
+            adminClient.addTopic(false, "foo", List.of(
+                new TopicPartitionInfo(1, brokers.get(1),
+                    List.of(brokers.get(1), brokers.get(2), brokers.get(3)),
+                    List.of(brokers.get(1), brokers.get(2), brokers.get(3)))
+            ), Map.of());
+
+            adminClient.addTopic(false, "bar", List.of(
+                new TopicPartitionInfo(0, brokers.get(4),
+                    List.of(brokers.get(4), brokers.get(5)),
+                    List.of(brokers.get(4), brokers.get(5)))
+            ), Map.of());
+
+            Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
+            proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2));
+            proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5));
+
+            Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
+            currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
+            currentParts.put(new TopicPartition("bar", 0), List.of(4, 5));
 
-        assertEquals(String.join(System.lineSeparator(),
-            "Current partition replica assignment",
-            "",
-            "{\"version\":1,\"partitions\":" +
-                
"[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[7,8],\"log_dirs\":[\"any\",\"any\"]},"
 +
-                
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[4,5,6],\"log_dirs\":[\"any\",\"any\",\"any\"]}]"
 +
-                "}",
-            "",
-            "Save this to use as the --reassignment-json-file option during 
rollback"),
-            currentPartitionReplicaAssignmentToString(proposedParts, 
currentParts)
-        );
+            assertEquals(String.join(System.lineSeparator(),
+                "Current partition replica assignment",
+                "",
+                
"{\"version\":1,\"partitions\":[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[4,5],\"log_dirs\":[\"/tmp/broker4/logs\",\"/tmp/broker4/logs\"]},"
 +
+                    
"{\"topic\":\"foo\",\"partition\":1,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}",
+                "",
+                "Save this to use as the --reassignment-json-file option 
during rollback"),
+                currentPartitionReplicaAssignmentToString(adminClient, 
proposedParts, currentParts)
+            );
+        }
     }
 
     @Test
@@ -765,4 +787,38 @@ public class ReassignPartitionsUnitTest {
                 assertThrows(AdminOperationException.class, () -> 
executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, 
Time.SYSTEM, false)).getMessage());
         }
     }
+
+    @Test
+    public void testGetReplicaToLogDir() throws Exception {
+        try (MockAdminClient adminClient = new MockAdminClient.Builder()
+                .numBrokers(4)
+                .brokerLogDirs(List.of(
+                    List.of("/tmp/broker0/logs0"),
+                    List.of("/tmp/broker1/logs0"),
+                    List.of("/tmp/broker2/logs0"),
+                    List.of("/tmp/broker3/logs0")
+                )).build()
+        ) {
+            addTopics(adminClient);
+
+            Map<TopicPartition, List<Integer>> topicPartitionToReplicas = 
Map.of(
+                new TopicPartition("foo", 0), List.of(0, 1, 2),
+                new TopicPartition("foo", 1), List.of(1, 2, 3),
+                new TopicPartition("bar", 0), List.of(2, 3, 0)
+            );
+
+            Map<TopicPartitionReplica, String> result = 
getReplicaToLogDir(adminClient, topicPartitionToReplicas);
+
+            assertFalse(result.isEmpty());
+            assertEquals("/tmp/broker0/logs0", result.get(new 
TopicPartitionReplica("foo", 0, 0)));
+            assertEquals("/tmp/broker0/logs0", result.get(new 
TopicPartitionReplica("foo", 0, 1)));
+            assertEquals("/tmp/broker0/logs0", result.get(new 
TopicPartitionReplica("foo", 0, 2)));
+            assertEquals("/tmp/broker1/logs0", result.get(new 
TopicPartitionReplica("foo", 1, 1)));
+            assertEquals("/tmp/broker1/logs0", result.get(new 
TopicPartitionReplica("foo", 1, 2)));
+            assertEquals("/tmp/broker1/logs0", result.get(new 
TopicPartitionReplica("foo", 1, 3)));
+            assertEquals("/tmp/broker2/logs0", result.get(new 
TopicPartitionReplica("bar", 0, 0)));
+            assertEquals("/tmp/broker2/logs0", result.get(new 
TopicPartitionReplica("bar", 0, 2)));
+            assertEquals("/tmp/broker2/logs0", result.get(new 
TopicPartitionReplica("bar", 0, 3)));
+        }
+    }
 }

Reply via email to