This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.2 by this push:
new c90e97ac1fd KAFKA-20023 Fix kafka-reassign-partitions.sh to handle
dead brokers (#21222)
c90e97ac1fd is described below
commit c90e97ac1fdda7dab4ceeb8771b099193b7d51ab
Author: Ken Huang <[email protected]>
AuthorDate: Sun Jan 4 00:34:26 2026 +0800
KAFKA-20023 Fix kafka-reassign-partitions.sh to handle dead brokers (#21222)
Return Node objects instead and use Node#isEmpty() to filter them out
before querying.
Test Result:
Broker 4 is alive
```
./bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092
--topics-to-move-json-file tmp.json --broker-list "2,3,4" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,4],"log_dirs":["/tmp/kraft-broker-logs-1","/tmp/kraft-broker-logs-2"]},{"topic":"test1","partition":1,"replicas":[4,2],"log_dirs":["/tmp/kraft-broker-logs-2","/tmp/kraft-broker-logs"]}]}
```
Broker 4 shutdown
```
/bin/kafka-reassign-partitions.sh --bootstrap-server localhost:19092
--topics-to-move-json-file tmp.json --broker-list "2,3,4" --generate
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,4],"log_dirs":["/tmp/kraft-broker-logs-1","any"]},{"topic":"test1","partition":1,"replicas":[4,2],"log_dirs":["any","/tmp/kraft-broker-logs"]}]}
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test1","partition":0,"replicas":[3,2],"log_dirs":["any","any"]},{"topic":"test1","partition":1,"replicas":[2,3],"log_dirs":["any","any"]}]}
```
Reviewers: PoAn Yang <[email protected]>, Andrew Schofield
<[email protected]>, Chia-Ping Tsai <[email protected]>
---
.../tools/reassign/ReassignPartitionsCommand.java | 115 +++++++++++++++------
.../reassign/ReassignPartitionsCommandTest.java | 43 ++++++++
.../tools/reassign/ReassignPartitionsUnitTest.java | 58 +++++++----
3 files changed, 164 insertions(+), 52 deletions(-)
diff --git
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
index 8e1f6114c24..1933e4e3526 100644
---
a/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
+++
b/tools/src/main/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommand.java
@@ -565,15 +565,16 @@ public class ReassignPartitionsCommand {
List<Integer> brokersToReassign = t0.getKey();
List<String> topicsToReassign = t0.getValue();
- Map<TopicPartition, List<Integer>> currentAssignments =
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
+ Map<TopicPartition, List<Node>> currentAssignments =
getReplicaAssignmentForTopics(adminClient, topicsToReassign);
Map<TopicPartitionReplica, String> currentReplicaLogDirs =
getReplicaToLogDir(adminClient, currentAssignments);
List<UsableBroker> usableBrokers = getBrokerMetadata(adminClient,
brokersToReassign, enableRackAwareness);
- Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentAssignments, usableBrokers);
+ Map<TopicPartition, List<Integer>> currentParts =
toReplicaIds(currentAssignments);
+ Map<TopicPartition, List<Integer>> proposedAssignments =
calculateAssignment(currentParts, usableBrokers);
System.out.printf("Current partition replica assignment%n%s%n%n",
- formatAsReassignmentJson(currentAssignments,
currentReplicaLogDirs));
+ formatAsReassignmentJson(currentParts, currentReplicaLogDirs));
System.out.printf("Proposed partition reassignment
configuration%n%s%n",
formatAsReassignmentJson(proposedAssignments, Map.of()));
- return Map.entry(proposedAssignments, currentAssignments);
+ return Map.entry(proposedAssignments, currentParts);
}
/**
@@ -642,14 +643,14 @@ public class ReassignPartitionsCommand {
* @return A map from partitions to broker assignments.
* If any topic can't be found, an exception will
be thrown.
*/
- static Map<TopicPartition, List<Integer>>
getReplicaAssignmentForTopics(Admin adminClient,
-
List<String> topics
+ static Map<TopicPartition, List<Node>> getReplicaAssignmentForTopics(Admin
adminClient,
+
List<String> topics
) throws ExecutionException, InterruptedException {
- Map<TopicPartition, List<Integer>> res = new HashMap<>();
+ Map<TopicPartition, List<Node>> res = new HashMap<>();
describeTopics(adminClient, new HashSet<>(topics)).forEach((topicName,
topicDescription) ->
topicDescription.partitions().forEach(info -> res.put(
new TopicPartition(topicName, info.partition()),
-
info.replicas().stream().map(Node::id).collect(Collectors.toList())
+ info.replicas()
)
));
return res;
@@ -663,15 +664,15 @@ public class ReassignPartitionsCommand {
* @return A map from partitions to broker assignments.
* If any topic or partition can't be found, an
exception will be thrown.
*/
- static Map<TopicPartition, List<Integer>>
getReplicaAssignmentForPartitions(Admin adminClient,
-
Set<TopicPartition> partitions
+ static Map<TopicPartition, List<Node>> getReplicasForPartitions(Admin
adminClient,
+
Set<TopicPartition> partitions
) throws ExecutionException, InterruptedException {
- Map<TopicPartition, List<Integer>> res = new HashMap<>();
+ Map<TopicPartition, List<Node>> res = new HashMap<>();
describeTopics(adminClient,
partitions.stream().map(TopicPartition::topic).collect(Collectors.toSet())).forEach((topicName,
topicDescription) ->
topicDescription.partitions().forEach(info -> {
TopicPartition tp = new TopicPartition(topicName,
info.partition());
if (partitions.contains(tp))
- res.put(tp,
info.replicas().stream().map(Node::id).collect(Collectors.toList()));
+ res.put(tp, info.replicas());
})
);
@@ -684,6 +685,17 @@ public class ReassignPartitionsCommand {
return res;
}
+ static Map<TopicPartition, List<Integer>> toReplicaIds(
+ Map<TopicPartition, List<Node>> replicaAssignmentForPartitions
+ ) {
+ return replicaAssignmentForPartitions.entrySet()
+ .stream()
+ .collect(Collectors.toMap(
+ Entry::getKey,
+ e ->
e.getValue().stream().map(Node::id).collect(Collectors.toList())
+ ));
+ }
+
/**
* Find the rack information for some brokers.
*
@@ -775,8 +787,10 @@ public class ReassignPartitionsCommand {
proposedParts.values().forEach(brokers::addAll);
verifyBrokerIds(adminClient, brokers);
- Map<TopicPartition, List<Integer>> currentParts =
getReplicaAssignmentForPartitions(adminClient, proposedParts.keySet());
-
System.out.println(currentPartitionReplicaAssignmentToString(adminClient,
proposedParts, currentParts));
+ Map<TopicPartition, List<Node>> currentPartsToNode =
getReplicasForPartitions(adminClient, proposedParts.keySet());
+ Map<TopicPartition, List<Integer>> currentParts =
toReplicaIds(currentPartsToNode);
+
+
System.out.println(currentPartitionReplicaAssignmentToString(adminClient,
proposedParts, currentPartsToNode));
if (interBrokerThrottle >= 0 || logDirThrottle >= 0) {
System.out.println(YOU_MUST_RUN_VERIFY_PERIODICALLY_MESSAGE);
@@ -919,21 +933,31 @@ public class ReassignPartitionsCommand {
*
* @param adminClient The admin client object to use.
* @param proposedParts The proposed partition assignment.
- * @param currentParts The current partition assignment.
+ * @param currentAssignments The current partition assignment
with Node information.
*
* @return The string to print. We will only
print information about
* partitions that appear in the
proposed partition assignment.
*/
- static String currentPartitionReplicaAssignmentToString(Admin adminClient,
-
Map<TopicPartition, List<Integer>> proposedParts,
-
Map<TopicPartition, List<Integer>> currentParts) throws
JsonProcessingException, ExecutionException, InterruptedException {
- Map<TopicPartition, List<Integer>> partitionsToBeReassigned =
currentParts.entrySet().stream()
+ static String currentPartitionReplicaAssignmentToString(
+ Admin adminClient,
+ Map<TopicPartition, List<Integer>> proposedParts,
+ Map<TopicPartition, List<Node>> currentAssignments
+ ) throws JsonProcessingException, ExecutionException, InterruptedException
{
+
+ Map<TopicPartition, List<Node>> partitionsToBeReassigned =
currentAssignments.entrySet()
+ .stream()
.filter(e -> proposedParts.containsKey(e.getKey()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));
- Map<TopicPartitionReplica, String> currentReplicaLogDirs =
getReplicaToLogDir(adminClient, partitionsToBeReassigned);
+
+ Map<TopicPartitionReplica, String> currentReplicaLogDirs =
getReplicaToLogDir(
+ adminClient,
+ partitionsToBeReassigned
+ );
+
+ Map<TopicPartition, List<Integer>> currentParts =
toReplicaIds(partitionsToBeReassigned);
return String.format("Current partition replica
assignment%n%n%s%n%nSave this to use as the %s",
- formatAsReassignmentJson(partitionsToBeReassigned,
currentReplicaLogDirs),
+ formatAsReassignmentJson(currentParts, currentReplicaLogDirs),
"--reassignment-json-file option during rollback");
}
@@ -1519,25 +1543,48 @@ public class ReassignPartitionsCommand {
return results;
}
+ /**
+ * Get the log directory for each replica.
+ *
+ * @param adminClient The admin client object to use.
+ * @param current The current partition assignment with Node information.
+ * @return Map of TopicPartitionReplica to log directory path.
+ */
static Map<TopicPartitionReplica, String> getReplicaToLogDir(
Admin adminClient,
- Map<TopicPartition, List<Integer>> topicPartitionToReplicas
- ) throws InterruptedException, ExecutionException {
- var replicaLogDirs = topicPartitionToReplicas
- .entrySet()
- .stream()
- .flatMap(entry -> entry.getValue()
- .stream()
- .map(id -> new
TopicPartitionReplica(entry.getKey().topic(), entry.getKey().partition(), id)))
- .collect(Collectors.toUnmodifiableSet());
+ Map<TopicPartition, List<Node>> current
+ ) throws ExecutionException, InterruptedException {
+ List<TopicPartitionReplica> availableReplicas = available(current);
+
+ if (availableReplicas.isEmpty()) {
+ return Map.of();
+ }
- return adminClient.describeReplicaLogDirs(replicaLogDirs).all().get()
+ return
adminClient.describeReplicaLogDirs(availableReplicas).all().get()
.entrySet()
.stream()
- .filter(entry -> entry.getValue().getCurrentReplicaLogDir() !=
null)
+ .filter(e -> e.getValue().getCurrentReplicaLogDir() != null)
.collect(Collectors.toMap(
Entry::getKey,
- entry -> entry.getValue().getCurrentReplicaLogDir()
- ));
+ e -> e.getValue().getCurrentReplicaLogDir())
+ );
+ }
+
+ /**
+ * Extract available (non-empty) replicas from the assignment.
+ */
+ private static List<TopicPartitionReplica> available(Map<TopicPartition,
List<Node>> current) {
+ return current.entrySet()
+ .stream()
+ .flatMap(entry -> entry.getValue()
+ .stream()
+ .filter(node -> !node.isEmpty())
+ .map(node -> new TopicPartitionReplica(
+ entry.getKey().topic(),
+ entry.getKey().partition(),
+ node.id()
+ ))
+ )
+ .toList();
}
}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
index a8d830b4bd4..a71e4779a3e 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java
@@ -502,6 +502,49 @@ public class ReassignPartitionsCommandTest {
}
}
+ @ClusterTest(types = {Type.KRAFT})
+ public void
testGenerateAssignmentWithOneBootstrapServerShutdownWontTimeout() throws
Exception {
+ var brokerIdToShutdown = 0;
+ createTopics();
+ var foo0 = new TopicPartition("foo", 0);
+ produceMessages(foo0.topic(), foo0.partition(), 100);
+
+ try (Admin admin =
Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
+ String topicsToMoveJson = """
+ {
+ "topics": [
+ { "topic": "foo" }
+ ],
+ "version": 1
+ }
+ """;
+ clusterInstance.shutdownBroker(brokerIdToShutdown);
+ TestUtils.waitForCondition(
+ () -> clusterInstance.aliveBrokers().size() == 4,
+ "Waiting for broker to shutdown failed"
+ );
+ generateAssignment(admin, topicsToMoveJson, "1,2,3", false);
+ }
+ }
+
+ @ClusterTest(types = {Type.KRAFT})
+ public void
testExecuteAssignmentWithOneBootstrapServerShutdownWontTimeout() throws
Exception {
+ var brokerIdToShutdown = 0;
+ createTopics();
+ var foo0 = new TopicPartition("foo", 0);
+ produceMessages(foo0.topic(), foo0.partition(), 100);
+ clusterInstance.shutdownBroker(brokerIdToShutdown);
+ TestUtils.waitForCondition(
+ () -> clusterInstance.aliveBrokers().size() == 4,
+ "Waiting for broker to shutdown failed"
+ );
+ // Execute the assignment
+ String assignment = "{\"version\":1,\"partitions\":" +
+
"[{\"topic\":\"foo\",\"partition\":0,\"replicas\":[3,1,2],\"log_dirs\":[\"any\",\"any\",\"any\"]}"
+
+ "]}";
+ runExecuteAssignment(false, assignment, -1L, -1L);
+ }
+
private void createTopics() {
try (Admin admin =
Admin.create(Map.of(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
clusterInstance.bootstrapServers()))) {
Map<Integer, List<Integer>> fooReplicasAssignments = new
HashMap<>();
diff --git
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
index 22e9011a2c2..d953ccf9ea7 100644
---
a/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
+++
b/tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsUnitTest.java
@@ -69,7 +69,6 @@ import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findLogD
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.findPartitionReassignmentStates;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.generateAssignment;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getBrokerMetadata;
-import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForPartitions;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaAssignmentForTopics;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.getReplicaToLogDir;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.modifyInterBrokerThrottle;
@@ -79,6 +78,7 @@ import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseExe
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.parseGenerateAssignmentArgs;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.partitionReassignmentStatesToString;
import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.replicaMoveStatesToString;
+import static
org.apache.kafka.tools.reassign.ReassignPartitionsCommand.toReplicaIds;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
@@ -290,20 +290,30 @@ public class ReassignPartitionsUnitTest {
assignments.put(new TopicPartition("foo", 0), List.of(0, 1, 2));
assignments.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
- assertEquals(assignments,
getReplicaAssignmentForTopics(adminClient, List.of("foo")));
+ assertEquals(
+ assignments,
+ toReplicaIds(getReplicaAssignmentForTopics(adminClient,
List.of("foo")))
+ );
assignments.clear();
assignments.put(new TopicPartition("foo", 0), List.of(0, 1, 2));
assignments.put(new TopicPartition("bar", 0), List.of(2, 3, 0));
- assertEquals(assignments,
- getReplicaAssignmentForPartitions(adminClient, Set.of(new
TopicPartition("foo", 0), new TopicPartition("bar", 0))));
+ Map<TopicPartition, List<Integer>> actualAssignments =
toReplicaIds(
+ ReassignPartitionsCommand.getReplicasForPartitions(
+ adminClient,
+ Set.of(new TopicPartition("foo", 0), new
TopicPartition("bar", 0))
+ ));
+ assertEquals(
+ assignments,
+ actualAssignments
+ );
UnknownTopicOrPartitionException exception =
assertInstanceOf(UnknownTopicOrPartitionException.class,
assertThrows(ExecutionException.class,
- () -> getReplicaAssignmentForPartitions(adminClient,
+ () ->
ReassignPartitionsCommand.getReplicasForPartitions(adminClient,
Set.of(new TopicPartition("foo", 0), new
TopicPartition("foo", 10)))).getCause());
assertEquals("Unable to find partition: foo-10",
exception.getMessage());
}
@@ -451,25 +461,31 @@ public class ReassignPartitionsUnitTest {
) {
List<Node> brokers = adminClient.brokers();
+ Node broker1 = brokers.get(1);
+ Node broker2 = brokers.get(2);
+ Node broker3 = brokers.get(3);
+ Node broker4 = brokers.get(4);
+ Node broker5 = brokers.get(5);
+
adminClient.addTopic(false, "foo", List.of(
- new TopicPartitionInfo(1, brokers.get(1),
- List.of(brokers.get(1), brokers.get(2), brokers.get(3)),
- List.of(brokers.get(1), brokers.get(2), brokers.get(3)))
+ new TopicPartitionInfo(1, broker1,
+ List.of(broker1, broker2, broker3),
+ List.of(broker1, broker2, broker3))
), Map.of());
adminClient.addTopic(false, "bar", List.of(
- new TopicPartitionInfo(0, brokers.get(4),
- List.of(brokers.get(4), brokers.get(5)),
- List.of(brokers.get(4), brokers.get(5)))
+ new TopicPartitionInfo(0, broker4,
+ List.of(broker4, broker5),
+ List.of(broker4, broker5))
), Map.of());
Map<TopicPartition, List<Integer>> proposedParts = new HashMap<>();
proposedParts.put(new TopicPartition("foo", 1), List.of(0, 1, 2));
proposedParts.put(new TopicPartition("bar", 0), List.of(3, 4, 5));
- Map<TopicPartition, List<Integer>> currentParts = new HashMap<>();
- currentParts.put(new TopicPartition("foo", 1), List.of(1, 2, 3));
- currentParts.put(new TopicPartition("bar", 0), List.of(4, 5));
+ Map<TopicPartition, List<Node>> currentParts = new HashMap<>();
+ currentParts.put(new TopicPartition("foo", 1), List.of(broker1,
broker2, broker3));
+ currentParts.put(new TopicPartition("bar", 0), List.of(broker4,
broker5));
assertEquals(String.join(System.lineSeparator(),
"Current partition replica assignment",
@@ -801,10 +817,16 @@ public class ReassignPartitionsUnitTest {
) {
addTopics(adminClient);
- Map<TopicPartition, List<Integer>> topicPartitionToReplicas =
Map.of(
- new TopicPartition("foo", 0), List.of(0, 1, 2),
- new TopicPartition("foo", 1), List.of(1, 2, 3),
- new TopicPartition("bar", 0), List.of(2, 3, 0)
+ List<Node> brokers = adminClient.brokers();
+ Node broker0 = brokers.get(0);
+ Node broker1 = brokers.get(1);
+ Node broker2 = brokers.get(2);
+ Node broker3 = brokers.get(3);
+
+ Map<TopicPartition, List<Node>> topicPartitionToReplicas = Map.of(
+ new TopicPartition("foo", 0), List.of(broker0, broker1,
broker2),
+ new TopicPartition("foo", 1), List.of(broker1, broker2,
broker3),
+ new TopicPartition("bar", 0), List.of(broker2, broker3,
broker0)
);
Map<TopicPartitionReplica, String> result =
getReplicaToLogDir(adminClient, topicPartitionToReplicas);