This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 23e2af1e0f7 KAFKA-20192: Fix brittle logic in
ReconfigurableQuorumIntegrationTest (#21519)
23e2af1e0f7 is described below
commit 23e2af1e0f721a38014315cae4b336114b223346
Author: Dmitry Werner <[email protected]>
AuthorDate: Fri Feb 27 14:07:22 2026 +0500
KAFKA-20192: Fix brittle logic in ReconfigurableQuorumIntegrationTest
(#21519)
Reviewers: Mickael Maison <[email protected]>
---
.../ReconfigurableQuorumIntegrationTest.java | 65 +++++++++++-----------
1 file changed, 32 insertions(+), 33 deletions(-)
diff --git
a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
index c3a14875cfa..f89ce62c3d8 100644
---
a/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -30,7 +30,6 @@ import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
-import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -41,6 +40,8 @@ import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
+import static org.apache.kafka.test.TestUtils.assertFutureThrows;
+import static org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -48,7 +49,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
- static void checkKRaftVersions(Admin admin, short finalized) throws
Exception {
+ private static void checkKRaftVersions(Admin admin, short finalized)
throws Exception {
FeatureMetadata featureMetadata =
admin.describeFeatures().featureMetadata().get();
if (finalized > 0) {
assertTrue(featureMetadata.finalizedFeatures().containsKey(KRaftVersion.FEATURE_NAME),
@@ -77,9 +78,8 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
- checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_0.featureLevel());
- });
+ retryOnExceptionWithTimeout(30_000, () ->
+ checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_0.featureLevel()));
}
}
}
@@ -95,19 +95,16 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
- checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_1.featureLevel());
- });
+ retryOnExceptionWithTimeout(30_000, () ->
+ checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_1.featureLevel()));
}
}
}
- static Map<Integer, Uuid> findVoterDirs(Admin admin) throws Exception {
+ private static Map<Integer, Uuid> findVoterDirs(Admin admin) throws
Exception {
QuorumInfo quorumInfo =
admin.describeMetadataQuorum().quorumInfo().get();
Map<Integer, Uuid> result = new TreeMap<>();
- quorumInfo.voters().forEach(v -> {
- result.put(v.replicaId(), v.replicaDirectoryId());
- });
+ quorumInfo.voters().forEach(v -> result.put(v.replicaId(),
v.replicaDirectoryId()));
return result;
}
@@ -133,7 +130,7 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
@@ -168,7 +165,7 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003),
voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002, 3003}) {
@@ -176,19 +173,18 @@ public class ReconfigurableQuorumIntegrationTest {
}
});
Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ int port = port(admin, 3000);
admin.removeRaftVoter(3000, dirId).all().get();
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002, 3003), voters.keySet());
for (int replicaId : new int[] {3001, 3002, 3003}) {
assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
}
});
- admin.addRaftVoter(
- 3000,
- dirId,
- Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com",
8080))
- ).all().get();
+
+ retryOnExceptionWithTimeout(30_000, 1_000, () ->
+ admin.addRaftVoter(3000, dirId, Set.of(new
RaftVoterEndpoint("CONTROLLER", "localhost", port))).all().get());
}
}
}
@@ -207,7 +203,7 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
@@ -245,7 +241,7 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
@@ -276,7 +272,7 @@ public class ReconfigurableQuorumIntegrationTest {
cluster.format();
cluster.startup();
try (var admin = cluster.admin()) {
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
for (int replicaId : new int[] {3000, 3001, 3002}) {
@@ -285,12 +281,13 @@ public class ReconfigurableQuorumIntegrationTest {
});
Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ int port = port(admin, 3000);
admin.removeRaftVoter(
3000,
dirId,
new
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
).all().get();
- TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3001, 3002), voters.keySet());
for (int replicaId : new int[] {3001, 3002}) {
@@ -298,12 +295,9 @@ public class ReconfigurableQuorumIntegrationTest {
}
});
- admin.addRaftVoter(
- 3000,
- dirId,
- Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com",
8080)),
- new
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
- ).all().get();
+ retryOnExceptionWithTimeout(30_000, 1_000, () ->
+ admin.addRaftVoter(3000, dirId, Set.of(new
RaftVoterEndpoint("CONTROLLER", "localhost", port)),
+ new
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))).all().get());
}
}
}
@@ -334,16 +328,21 @@ public class ReconfigurableQuorumIntegrationTest {
dirId,
new
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
-
TestUtils.assertFutureThrows(InconsistentClusterIdException.class,
removeFuture);
+ assertFutureThrows(InconsistentClusterIdException.class,
removeFuture);
var addFuture = admin.addRaftVoter(
3000,
dirId,
- Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com",
8080)),
+ Set.of(new RaftVoterEndpoint("CONTROLLER", "localhost",
port(admin, 3000))),
new
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
).all();
-
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
+ assertFutureThrows(InconsistentClusterIdException.class,
addFuture);
}
}
}
+
+ private static int port(Admin admin, int nodeId) throws Exception {
+ return
admin.describeMetadataQuorum().quorumInfo().get().nodes().get(nodeId).endpoints().stream()
+ .findFirst().orElseThrow().port();
+ }
}
\ No newline at end of file