This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new d5efe5583a2 KAFKA-20558 NPE
ClusterInstance.waitUntilLeaderIsElectedOrChanged (#22234)
d5efe5583a2 is described below
commit d5efe5583a2c984f5b0938caf75c43237ec55d22
Author: Igor Soarez <[email protected]>
AuthorDate: Sat May 9 05:42:51 2026 +0100
KAFKA-20558 NPE ClusterInstance.waitUntilLeaderIsElectedOrChanged (#22234)
TopicPartitionInfo.leader() may return null, but the existing code in
ClusterInstance called .id() without a null check. Extract the logic
into a new AdminUtils class with null handling, delegate from
ClusterInstance, and add unit tests.
Reviewers: Chia-Ping Tsai <[email protected]>
---
build.gradle | 1 +
.../kafka/clients/admin/AddPartitionsTest.java | 9 ++-
.../kafka/clients/admin/DeleteTopicTest.java | 3 +-
.../org/apache/kafka/common/test/AdminUtils.java | 84 +++++++++++++++++++++
.../apache/kafka/common/test/ClusterInstance.java | 47 ------------
.../apache/kafka/common/test/AdminUtilsTest.java | 87 ++++++++++++++++++++++
.../kafka/tools/LeaderElectionCommandTest.java | 3 +-
7 files changed, 181 insertions(+), 53 deletions(-)
diff --git a/build.gradle b/build.gradle
index 0f90b9f2298..c047c521e94 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1668,6 +1668,7 @@ project(':test-common:test-common-runtime') {
implementation libs.slf4jApi
implementation testFixtures(project(':clients'))
+ testImplementation testFixtures(project(':clients'))
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
index afaec380867..2b8b1563381 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.test.AdminUtils;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterTest;
import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -102,8 +103,8 @@ public class AddPartitionsTest {
admin.createPartitions(Map.of("topic1",
NewPartitions.increaseTo(3))).all().get();
- cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic1", 1, 30000);
- cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic1", 2, 30000);
+ AdminUtils.fetchOrWaitForLeader(admin, "topic1", 1, 30000);
+ AdminUtils.fetchOrWaitForLeader(admin, "topic1", 2, 30000);
cluster.waitTopicCreation("topic1", 3);
@@ -127,8 +128,8 @@ public class AddPartitionsTest {
admin.createPartitions(Map.of("topic2", NewPartitions.increaseTo(3,
List.of(List.of(0, 1), List.of(2, 3))))).all().get();
- cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic2", 1, 30000);
- cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
"topic2", 2, 30000);
+ AdminUtils.fetchOrWaitForLeader(admin, "topic2", 1, 30000);
+ AdminUtils.fetchOrWaitForLeader(admin, "topic2", 2, 30000);
cluster.waitTopicCreation("topic2", 3);
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
index ee57fc6e86c..cc831facfb8 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
@@ -24,6 +24,7 @@ import
org.apache.kafka.common.errors.TopicDeletionDisabledException;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.record.internal.MemoryRecords;
import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.test.AdminUtils;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
@@ -221,7 +222,7 @@ public class DeleteTopicTest {
cluster.waitTopicDeletion(topic);
waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas
for topic test not created.");
- cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin,
DEFAULT_TOPIC, 0, 1000);
+ AdminUtils.fetchOrWaitForLeader(admin, DEFAULT_TOPIC, 0, 1000);
}
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java
new file mode 100644
index 00000000000..39aa3ec0912
--- /dev/null
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+public final class AdminUtils {
+
+ private AdminUtils() {}
+
+ /**
+ * Fetch the partition leader or wait until one is elected using the
provided admin client.
+ */
+ public static int fetchOrWaitForLeader(Admin admin,
+ String topic,
+ int partitionNumber,
+ long timeoutMs) throws
InterruptedException {
+
+ var condition = new Supplier<Boolean>() {
+ int leader = Node.noNode().id();
+ @Override
+ public Boolean get() {
+ checkLeader();
+ return this.leader != Node.noNode().id();
+ }
+
+ public void checkLeader() {
+ try {
+ TopicDescription topicDescription =
admin.describeTopics(List.of(topic))
+ .allTopicNames().get().get(topic);
+
+ Optional<Integer> leader =
topicDescription.partitions().stream()
+ .filter(partitionInfo -> partitionInfo.partition()
== partitionNumber)
+ .findFirst()
+ .flatMap(partitionInfo ->
Optional.ofNullable(partitionInfo.leader()))
+ .map(node -> {
+ int leaderId = node.id();
+ return leaderId == Node.noNode().id() ? null :
leaderId;
+ });
+
+ leader.ifPresent(integer -> this.leader = integer);
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ boolean isTransient = cause instanceof
UnknownTopicOrPartitionException
+ || cause instanceof LeaderNotAvailableException;
+ if (!isTransient) {
+ throw new RuntimeException(e);
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
+ }
+ };
+
+ TestUtils.waitForCondition(condition, timeoutMs, "Timing out after %d
ms since a leader was not elected for partition %s-%d".formatted(timeoutMs,
topic, partitionNumber));
+
+ return condition.leader;
+ }
+}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index f41fee0ff4f..21a63f13c01 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -35,13 +35,10 @@ import org.apache.kafka.clients.consumer.ShareConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -68,7 +65,6 @@ import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
@@ -415,47 +411,4 @@ public interface ClusterInstance {
.orElseThrow(() -> new RuntimeException("Leader not found
for tp " + topicPartition));
}
}
-
- /**
- * Wait for a leader to be elected or changed using the provided admin
client.
- */
- default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
- String topic,
- int partitionNumber,
- long timeoutMs)
throws Exception {
- long startTime = System.currentTimeMillis();
- TopicPartition topicPartition = new TopicPartition(topic,
partitionNumber);
-
- while (System.currentTimeMillis() < startTime + timeoutMs) {
- try {
- TopicDescription topicDescription =
admin.describeTopics(List.of(topic))
- .allTopicNames().get().get(topic);
-
- Optional<Integer> leader =
topicDescription.partitions().stream()
- .filter(partitionInfo -> partitionInfo.partition() ==
partitionNumber)
- .findFirst()
- .map(partitionInfo -> {
- int leaderId = partitionInfo.leader().id();
- return leaderId == Node.noNode().id() ? null :
leaderId;
- });
-
- if (leader.isPresent()) {
- return leader.get();
- }
- } catch (ExecutionException e) {
- Throwable cause = e.getCause();
- if (cause instanceof UnknownTopicOrPartitionException ||
- cause instanceof LeaderNotAvailableException) {
- continue;
- } else {
- throw e;
- }
- }
-
- TimeUnit.MILLISECONDS.sleep(Math.min(100L, timeoutMs));
- }
-
- throw new AssertionError("Timing out after " + timeoutMs +
- " ms since a leader was not elected for partition " +
topicPartition);
- }
}
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/AdminUtilsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/AdminUtilsTest.java
new file mode 100644
index 00000000000..b4002734a40
--- /dev/null
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/AdminUtilsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("resource")
+public class AdminUtilsTest {
+
+ @Test
+ void testFetchOrWaitForLeader() throws Exception {
+ String topic = "test-topic";
+ int partition = 0;
+ Admin admin = mock(Admin.class);
+ when(admin.describeTopics(anyCollection())).thenAnswer(new
Answer<Object>() {
+ boolean called = false;
+
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable
{
+ if (called) {
+ return resultWithLeader(new Node(0, "", 0));
+ } else {
+ called = true;
+ return resultWithLeader(new Node(1, "", 0));
+ }
+ }
+
+ DescribeTopicsResult resultWithLeader(Node leader) {
+ TopicPartitionInfo topicPartitionInfo = new
TopicPartitionInfo(partition, leader, List.of(), List.of());
+ return AdminClientTestUtils.describeTopicsResult(topic,
+ new TopicDescription(topic, false,
List.of(topicPartitionInfo)));
+ }
+ });
+
+ int result = AdminUtils.fetchOrWaitForLeader(admin, topic, partition,
1000);
+
+ assertEquals(1, result);
+ }
+
+ @Test
+ void testFetchOrWaitForLeaderTimesOut() throws Exception {
+ String topic = "test-topic";
+ int partition = 0;
+ Node leader = null;
+ TopicPartitionInfo topicPartitionInfo = new
TopicPartitionInfo(partition, leader, List.of(), List.of());
+ DescribeTopicsResult describeResult =
AdminClientTestUtils.describeTopicsResult(topic,
+ new TopicDescription(topic, false,
List.of(topicPartitionInfo)));
+ Admin admin = mock(Admin.class);
+ when(admin.describeTopics(anyCollection())).thenReturn(describeResult);
+
+ assertThrows(AssertionError.class, () ->
+ AdminUtils.fetchOrWaitForLeader(admin, topic,
partition, 1),
+ "Timing out after 1 ms since a leader was not elected for
partition test-topic-0");
+ }
+}
diff --git
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index d3d646cae7e..4111563e99e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.test.AdminUtils;
import org.apache.kafka.common.test.ClusterInstance;
import org.apache.kafka.common.test.api.ClusterConfigProperty;
import org.apache.kafka.common.test.api.ClusterTest;
@@ -446,7 +447,7 @@ public class LeaderElectionCommandTest {
}
private void assertLeader(Admin client, TopicPartition topicPartition, int
expectedLeader) throws Exception {
- int leader =
cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(client,
topicPartition.topic(), topicPartition.partition(), 30000);
+ int leader = AdminUtils.fetchOrWaitForLeader(client,
topicPartition.topic(), topicPartition.partition(), 30000);
assertEquals(expectedLeader, leader);
}