This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5efe5583a2 KAFKA-20558 NPE 
ClusterInstance.waitUntilLeaderIsElectedOrChanged (#22234)
d5efe5583a2 is described below

commit d5efe5583a2c984f5b0938caf75c43237ec55d22
Author: Igor Soarez <[email protected]>
AuthorDate: Sat May 9 05:42:51 2026 +0100

    KAFKA-20558 NPE ClusterInstance.waitUntilLeaderIsElectedOrChanged (#22234)
    
    TopicPartitionInfo.leader() may return null, but the existing code in
    ClusterInstance called .id() without a null check. Extract the logic
    into a new AdminUtils class with null handling, delegate from
    ClusterInstance, and add unit tests.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 build.gradle                                       |  1 +
 .../kafka/clients/admin/AddPartitionsTest.java     |  9 ++-
 .../kafka/clients/admin/DeleteTopicTest.java       |  3 +-
 .../org/apache/kafka/common/test/AdminUtils.java   | 84 +++++++++++++++++++++
 .../apache/kafka/common/test/ClusterInstance.java  | 47 ------------
 .../apache/kafka/common/test/AdminUtilsTest.java   | 87 ++++++++++++++++++++++
 .../kafka/tools/LeaderElectionCommandTest.java     |  3 +-
 7 files changed, 181 insertions(+), 53 deletions(-)

diff --git a/build.gradle b/build.gradle
index 0f90b9f2298..c047c521e94 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1668,6 +1668,7 @@ project(':test-common:test-common-runtime') {
     implementation libs.slf4jApi
     implementation testFixtures(project(':clients'))
 
+    testImplementation testFixtures(project(':clients'))
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
     testImplementation testLog4j2Libs
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
index afaec380867..2b8b1563381 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/AddPartitionsTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.errors.InvalidReplicaAssignmentException;
+import org.apache.kafka.common.test.AdminUtils;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterTest;
 import org.apache.kafka.common.test.api.ClusterTestDefaults;
@@ -102,8 +103,8 @@ public class AddPartitionsTest {
 
             admin.createPartitions(Map.of("topic1", 
NewPartitions.increaseTo(3))).all().get();
 
-            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic1", 1, 30000);
-            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic1", 2, 30000);
+            AdminUtils.fetchOrWaitForLeader(admin, "topic1", 1, 30000);
+            AdminUtils.fetchOrWaitForLeader(admin, "topic1", 2, 30000);
 
             cluster.waitTopicCreation("topic1", 3);
 
@@ -127,8 +128,8 @@ public class AddPartitionsTest {
             admin.createPartitions(Map.of("topic2", NewPartitions.increaseTo(3,
                 List.of(List.of(0, 1), List.of(2, 3))))).all().get();
 
-            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic2", 1, 30000);
-            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
"topic2", 2, 30000);
+            AdminUtils.fetchOrWaitForLeader(admin, "topic2", 1, 30000);
+            AdminUtils.fetchOrWaitForLeader(admin, "topic2", 2, 30000);
 
             cluster.waitTopicCreation("topic2", 3);
 
diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
index ee57fc6e86c..cc831facfb8 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/DeleteTopicTest.java
@@ -24,6 +24,7 @@ import 
org.apache.kafka.common.errors.TopicDeletionDisabledException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.record.internal.MemoryRecords;
 import org.apache.kafka.common.record.internal.SimpleRecord;
+import org.apache.kafka.common.test.AdminUtils;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
@@ -221,7 +222,7 @@ public class DeleteTopicTest {
             cluster.waitTopicDeletion(topic);
 
             waitForReplicaCreated(cluster.brokers(), topicPartition, "Replicas 
for topic test not created.");
-            cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, 
DEFAULT_TOPIC, 0, 1000);
+            AdminUtils.fetchOrWaitForLeader(admin, DEFAULT_TOPIC, 0, 1000);
         }
     }
 
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java
new file mode 100644
index 00000000000..39aa3ec0912
--- /dev/null
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/AdminUtils.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.LeaderNotAvailableException;
+import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+public final class AdminUtils {
+
+    private AdminUtils() {}
+
+    /**
+     * Fetch the partition leader or wait until one is elected using the 
provided admin client.
+     */
+    public static int fetchOrWaitForLeader(Admin admin,
+                                           String topic,
+                                           int partitionNumber,
+                                           long timeoutMs) throws 
InterruptedException {
+
+        var condition = new Supplier<Boolean>() {
+            int leader = Node.noNode().id();
+            @Override
+            public Boolean get() {
+                checkLeader();
+                return this.leader != Node.noNode().id();
+            }
+
+            public void checkLeader() {
+                try {
+                    TopicDescription topicDescription = 
admin.describeTopics(List.of(topic))
+                            .allTopicNames().get().get(topic);
+
+                    Optional<Integer> leader = 
topicDescription.partitions().stream()
+                            .filter(partitionInfo -> partitionInfo.partition() 
== partitionNumber)
+                            .findFirst()
+                            .flatMap(partitionInfo -> 
Optional.ofNullable(partitionInfo.leader()))
+                            .map(node -> {
+                                int leaderId = node.id();
+                                return leaderId == Node.noNode().id() ? null : 
leaderId;
+                            });
+
+                    leader.ifPresent(integer -> this.leader = integer);
+                } catch (ExecutionException e) {
+                    Throwable cause = e.getCause();
+                    boolean isTransient = cause instanceof 
UnknownTopicOrPartitionException
+                            || cause instanceof LeaderNotAvailableException;
+                    if (!isTransient) {
+                        throw new RuntimeException(e);
+                    }
+                } catch (InterruptedException e) {
+                    Thread.currentThread().interrupt();
+                    throw new RuntimeException(e);
+                }
+            }
+        };
+
+        TestUtils.waitForCondition(condition, timeoutMs, "Timing out after %d 
ms since a leader was not elected for partition %s-%d".formatted(timeoutMs, 
topic, partitionNumber));
+
+        return condition.leader;
+    }
+}
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index f41fee0ff4f..21a63f13c01 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -35,13 +35,10 @@ import org.apache.kafka.clients.consumer.ShareConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.acl.AccessControlEntry;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.errors.LeaderNotAvailableException;
-import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
@@ -68,7 +65,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
@@ -415,47 +411,4 @@ public interface ClusterInstance {
                     .orElseThrow(() -> new RuntimeException("Leader not found 
for tp " + topicPartition));
         }
     }
-
-    /**
-     * Wait for a leader to be elected or changed using the provided admin 
client.
-     */
-    default int waitUntilLeaderIsElectedOrChangedWithAdmin(Admin admin,
-                                                           String topic,
-                                                           int partitionNumber,
-                                                           long timeoutMs) 
throws Exception {
-        long startTime = System.currentTimeMillis();
-        TopicPartition topicPartition = new TopicPartition(topic, 
partitionNumber);
-
-        while (System.currentTimeMillis() < startTime + timeoutMs) {
-            try {
-                TopicDescription topicDescription = 
admin.describeTopics(List.of(topic))
-                        .allTopicNames().get().get(topic);
-
-                Optional<Integer> leader = 
topicDescription.partitions().stream()
-                        .filter(partitionInfo -> partitionInfo.partition() == 
partitionNumber)
-                        .findFirst()
-                        .map(partitionInfo -> {
-                            int leaderId = partitionInfo.leader().id();
-                            return leaderId == Node.noNode().id() ? null : 
leaderId;
-                        });
-
-                if (leader.isPresent()) {
-                    return leader.get();
-                }
-            } catch (ExecutionException e) {
-                Throwable cause = e.getCause();
-                if (cause instanceof UnknownTopicOrPartitionException ||
-                        cause instanceof LeaderNotAvailableException) {
-                    continue;
-                } else {
-                    throw e;
-                }
-            }
-
-            TimeUnit.MILLISECONDS.sleep(Math.min(100L, timeoutMs));
-        }
-
-        throw new AssertionError("Timing out after " + timeoutMs +
-                " ms since a leader was not elected for partition " + 
topicPartition);
-    }
 }
diff --git 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/AdminUtilsTest.java
 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/AdminUtilsTest.java
new file mode 100644
index 00000000000..b4002734a40
--- /dev/null
+++ 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/AdminUtilsTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientTestUtils;
+import org.apache.kafka.clients.admin.DescribeTopicsResult;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartitionInfo;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.anyCollection;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+@SuppressWarnings("resource")
+public class AdminUtilsTest {
+
+    @Test
+    void testFetchOrWaitForLeader() throws Exception {
+        String topic = "test-topic";
+        int partition = 0;
+        Admin admin = mock(Admin.class);
+        when(admin.describeTopics(anyCollection())).thenAnswer(new 
Answer<Object>() {
+            boolean called = false;
+
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable 
{
+                if (called) {
+                    return resultWithLeader(new Node(0, "", 0));
+                } else {
+                    called = true;
+                    return resultWithLeader(new Node(1, "", 0));
+                }
+            }
+
+            DescribeTopicsResult resultWithLeader(Node leader) {
+                TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(partition, leader, List.of(), List.of());
+                return AdminClientTestUtils.describeTopicsResult(topic,
+                        new TopicDescription(topic, false, 
List.of(topicPartitionInfo)));
+            }
+        });
+
+        int result = AdminUtils.fetchOrWaitForLeader(admin, topic, partition, 
1000);
+
+        assertEquals(1, result);
+    }
+
+    @Test
+    void testFetchOrWaitForLeaderTimesOut() throws Exception {
+        String topic = "test-topic";
+        int partition = 0;
+        Node leader = null;
+        TopicPartitionInfo topicPartitionInfo = new 
TopicPartitionInfo(partition, leader, List.of(), List.of());
+        DescribeTopicsResult describeResult = 
AdminClientTestUtils.describeTopicsResult(topic,
+                new TopicDescription(topic, false, 
List.of(topicPartitionInfo)));
+        Admin admin = mock(Admin.class);
+        when(admin.describeTopics(anyCollection())).thenReturn(describeResult);
+
+        assertThrows(AssertionError.class, () ->
+                        AdminUtils.fetchOrWaitForLeader(admin, topic, 
partition, 1),
+                "Timing out after 1 ms since a leader was not elected for 
partition test-topic-0");
+    }
+}
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java 
b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
index d3d646cae7e..4111563e99e 100644
--- a/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/LeaderElectionCommandTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
+import org.apache.kafka.common.test.AdminUtils;
 import org.apache.kafka.common.test.ClusterInstance;
 import org.apache.kafka.common.test.api.ClusterConfigProperty;
 import org.apache.kafka.common.test.api.ClusterTest;
@@ -446,7 +447,7 @@ public class LeaderElectionCommandTest {
     }
 
     private void assertLeader(Admin client, TopicPartition topicPartition, int 
expectedLeader) throws Exception {
-        int leader = 
cluster.waitUntilLeaderIsElectedOrChangedWithAdmin(client, 
topicPartition.topic(), topicPartition.partition(), 30000);
+        int leader = AdminUtils.fetchOrWaitForLeader(client, 
topicPartition.topic(), topicPartition.partition(), 30000);
         assertEquals(expectedLeader, leader);
     }
 

Reply via email to