This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 47bb46c10dc KAFKA-19582 the current assignments shown by ReassignPartitionsCommand should include the log directories (#20319) 47bb46c10dc is described below commit 47bb46c10dc82dc6ba584ca1659400d899f92893 Author: Ken Huang <s7133...@gmail.com> AuthorDate: Sat Aug 23 02:57:00 2025 +0800 KAFKA-19582 the current assignments shown by ReassignPartitionsCommand should include the log directories (#20319) The ReassignPartitionsCommand shows the topic replicas on each broker. When using the --generate command, it returns the current partition replica assignment. However, the log directory for each current replica is always shown as any. This makes it impossible for users to determine which specific log directory is being used by each replica. Therefore, we should fix this behavior. ``` Current partition replica assignment { "version": 1, "partitions": [ { "topic": "test1", "partition": 0, "replicas": [ 4, 2 ], "log_dirs": [ "any", "any" ] } ] } ``` This PR ``` Current partition replica assignment { "version": 1, "partitions": [ { "topic": "test1", "partition": 0, "replicas": [ 4, 2 ], "log_dirs": [ "/tmp/kraft-broker-logs234", "/tmp/kraft-broker-logs" ] } ] } ``` Reviewers: PoAn Yang <pay...@apache.org>, Jhen-Yung Hsu <jhenyung...@gmail.com>, TaiJuWu <tjwu1...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../tools/reassign/ReassignPartitionsCommand.java | 36 ++++++-- .../tools/reassign/ReassignPartitionsUnitTest.java | 100 ++++++++++++++++----- 2 files changed, 109 insertions(+), 27 deletions(-) diff --git a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java index 4628c34fe18..8e1f6114c24 100644 --- a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java +++ b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java @@ -566,10 +566,11 @@ public class ReassignPartitionsCommand { List<String> topicsToReassign = t0.getValue(); Map<TopicPartition, List<Integer>> currentAssignments = getReplicaAssignmentForTopics(adminClient, topicsToReassign); + Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(adminClient, currentAssignments); List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient, brokersToReassign, enableRackAwareness); Map<TopicPartition, List<Integer>> proposedAssignments = calculateAssignment(currentAssignments, usableBrokers); System.out.printf("Current partition replica assignment%n%s%n%n", - formatAsReassignmentJson(currentAssignments, Map.of())); + formatAsReassignmentJson(currentAssignments, currentReplicaLogDirs)); System.out.printf("Proposed partition reassignment configuration%n%s%n", formatAsReassignmentJson(proposedAssignments, Map.of())); return Map.entry(proposedAssignments, currentAssignments); @@ -775,7 +776,7 @@ public class ReassignPartitionsCommand { verifyBrokerIds(adminClient, brokers); Map<TopicPartition, List<Integer>> currentParts = getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet()); - System.out.println(currentPartitionReplicaAssignmentToString(proposedParts, currentParts)); + System.out.println(currentPartitionReplicaAssignmentToString(adminClient, proposedParts, currentParts)); if (interBrokerThrottle >= 0 || logDirThrottle >= 0) { System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE); @@ -916,20 +917,23 @@ public class ReassignPartitionsCommand { /** * Return the string which we want to print to describe the current partition assignment. * + * @param adminClient The admin client object to use. * @param proposedParts The proposed partition assignment. * @param currentParts The current partition assignment. * * @return The string to print. We will only print information about * partitions that appear in the proposed partition assignment. */ - static String currentPartitionReplicaAssignmentToString(Map<TopicPartition, List<Integer>> proposedParts, - Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException { + static String currentPartitionReplicaAssignmentToString(Admin adminClient, + Map<TopicPartition, List<Integer>> proposedParts, + Map<TopicPartition, List<Integer>> currentParts) throws JsonProcessingException, ExecutionException, InterruptedException { Map<TopicPartition, List<Integer>> partitionsToBeReassigned = currentParts.entrySet().stream() .filter(e -> proposedParts.containsKey(e.getKey())) .collect(Collectors.toMap(Entry::getKey, Entry::getValue)); + Map<TopicPartitionReplica, String> currentReplicaLogDirs = getReplicaToLogDir(adminClient, partitionsToBeReassigned); return String.format("Current partition replica assignment%n%n%s%n%nSave this to use as the %s", - formatAsReassignmentJson(partitionsToBeReassigned, Map.of()), + formatAsReassignmentJson(partitionsToBeReassigned, currentReplicaLogDirs), "--reassignment-json-file option during rollback"); } @@ -1514,4 +1518,26 @@ public class ReassignPartitionsCommand { } return results; } + + static Map<TopicPartitionReplica, String> getReplicaToLogDir( + Admin adminClient, + Map<TopicPartition, List<Integer>> topicPartitionToReplicas + ) throws InterruptedException, ExecutionException { + var replicaLogDirs = topicPartitionToReplicas + .entrySet() + .stream() + .flatMap(entry -> entry.getValue() + .stream() + .map(id -> new TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id))) + .collect(Collectors.toUnmodifiableSet()); + + return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get() + .entrySet() + .stream() + .filter(entry -> entry.getValue().getCurrentReplicaLogDir() != null) + .collect(Collectors.toMap( + Entry::getKey, + entry -> entry.getValue().getCurrentReplicaLogDir() + )); + } } diff --git a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java index 949b595a115..22e9011a2c2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java @@ -71,6 +71,7 @@ import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generate import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics; +import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyLogDirThrottle; import static org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyTopicThrottles; @@ -436,29 +437,50 @@ public class ReassignPartitionsUnitTest { @Test public void testCurrentPartitionReplicaAssignmentToString() throws Exception { - Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>(); - - proposedParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3)); - proposedParts.put(new TopicPartition("bar", 0), List.of(7, 8, 9)); - - Map<TopicPartition, List<Integer>> currentParts = new HashMap<>(); - - currentParts.put(new TopicPartition("foo", 0), List.of(1, 2, 3)); - currentParts.put(new TopicPartition("foo", 1), List.of(4, 5, 6)); - currentParts.put(new TopicPartition("bar", 0), List.of(7, 8)); - currentParts.put(new TopicPartition("baz", 0), List.of(10, 11, 12)); + try (MockAdminClient adminClient = new MockAdminClient.Builder() + .numBrokers(6) + .brokerLogDirs(List.of( + List.of("/tmp/broker0/logs"), + List.of("/tmp/broker1/logs"), + List.of("/tmp/broker2/logs"), + List.of("/tmp/broker3/logs"), + List.of("/tmp/broker4/logs"), + List.of("/tmp/broker5/logs") + )) + .build() + ) { + + List<Node> brokers = adminClient.brokers(); + adminClient.addTopic(false, "foo", List.of( + new TopicPartitionInfo(1, brokers.get(1), + List.of(brokers.get(1), brokers.get(2), brokers.get(3)), + List.of(brokers.get(1), brokers.get(2), brokers.get(3))) + ), Map.of()); + + adminClient.addTopic(false, "bar", List.of( + new TopicPartitionInfo(0, brokers.get(4), + List.of(brokers.get(4), brokers.get(5)), + List.of(brokers.get(4), brokers.get(5))) + ), Map.of()); + + Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>(); + proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2)); + proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5)); + + Map<TopicPartition, List<Integer>> currentParts = new HashMap<>(); + currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3)); + currentParts.put(new TopicPartition("bar", 0), List.of(4, 5)); - assertEquals(String.join(System.lineSeparator(), - "Current partition replica assignment", - "", - "{\"version\":1,\"partitions\":" + - "[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[7,8],\"log_dirs\":[\"any\",\"any\"]}," + - "{\"topic\":\"foo\",\"partition\":1,\"replicas\":[4,5,6],\"log_dirs\":[\"any\",\"any\",\"any\"]}]" + - "}", - "", - "Save this to use as the --reassignment-json-file option during rollback"), - currentPartitionReplicaAssignmentToString(proposedParts, currentParts) - ); + assertEquals(String.join(System.lineSeparator(), + "Current partition replica assignment", + "", + "{\"version\":1,\"partitions\":[{\"topic\":\"bar\",\"partition\":0,\"replicas\":[4,5],\"log_dirs\":[\"/tmp/broker4/logs\",\"/tmp/broker4/logs\"]}," + + "{\"topic\":\"foo\",\"partition\":1,\"replicas\":[1,2,3],\"log_dirs\":[\"any\",\"any\",\"any\"]}]}", + "", + "Save this to use as the --reassignment-json-file option during rollback"), + currentPartitionReplicaAssignmentToString(adminClient, proposedParts, currentParts) + ); + } } @Test @@ -765,4 +787,38 @@ public class ReassignPartitionsUnitTest { assertThrows(AdminOperationException.class, () -> executeAssignment(adminClient, false, "{invalid_json", -1L, -1L, 10000L, Time.SYSTEM, false)).getMessage()); } } + + @Test + public void testGetReplicaToLogDir() throws Exception { + try (MockAdminClient adminClient = new MockAdminClient.Builder() + .numBrokers(4) + .brokerLogDirs(List.of( + List.of("/tmp/broker0/logs0"), + List.of("/tmp/broker1/logs0"), + List.of("/tmp/broker2/logs0"), + List.of("/tmp/broker3/logs0") + )).build() + ) { + addTopics(adminClient); + + Map<TopicPartition, List<Integer>> topicPartitionToReplicas = Map.of( + new TopicPartition("foo", 0), List.of(0, 1, 2), + new TopicPartition("foo", 1), List.of(1, 2, 3), + new TopicPartition("bar", 0), List.of(2, 3, 0) + ); + + Map<TopicPartitionReplica, String> result = getReplicaToLogDir(adminClient, topicPartitionToReplicas); + + assertFalse(result.isEmpty()); + assertEquals("/tmp/broker0/logs0", result.get(new TopicPartitionReplica("foo", 0, 0))); + assertEquals("/tmp/broker0/logs0", result.get(new TopicPartitionReplica("foo", 0, 1))); + assertEquals("/tmp/broker0/logs0", result.get(new TopicPartitionReplica("foo", 0, 2))); + assertEquals("/tmp/broker1/logs0", result.get(new TopicPartitionReplica("foo", 1, 1))); + assertEquals("/tmp/broker1/logs0", result.get(new TopicPartitionReplica("foo", 1, 2))); + assertEquals("/tmp/broker1/logs0", result.get(new TopicPartitionReplica("foo", 1, 3))); + assertEquals("/tmp/broker2/logs0", result.get(new TopicPartitionReplica("bar", 0, 0))); + assertEquals("/tmp/broker2/logs0", result.get(new TopicPartitionReplica("bar", 0, 2))); + assertEquals("/tmp/broker2/logs0", result.get(new TopicPartitionReplica("bar", 0, 3))); + } + } }