This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4a5aa371697 MINOR: Move ReconfigurableQuorumIntegrationTest from core
module to server module (#20636)
4a5aa371697 is described below
commit 4a5aa3716978a766827521af1942ec54d6636324
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Tue Oct 7 19:10:58 2025 +0200
MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server
module (#20636)
It moves the `ReconfigurableQuorumIntegrationTest` class to the
`org.apache.kafka.server` package and consolidates two related tests,
`RemoveAndAddVoterWithValidClusterId` and
`RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
improves code organization and reduces redundancy.
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../admin/ReconfigurableQuorumIntegrationTest.java | 132 ---------------------
.../ReconfigurableQuorumIntegrationTest.java | 113 ++++++++++++++++--
2 files changed, 105 insertions(+), 140 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
deleted file mode 100644
index f02db36c061..00000000000
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.InconsistentClusterIdException;
-import org.apache.kafka.common.test.KafkaClusterTestKit;
-import org.apache.kafka.common.test.TestKitNodes;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-@Tag("integration")
-public class ReconfigurableQuorumIntegrationTest {
-
- static Map<Integer, Uuid> descVoterDirs(Admin admin) throws
ExecutionException, InterruptedException {
- var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
- return
quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId,
QuorumInfo.ReplicaState::replicaDirectoryId));
- }
-
- @Test
- public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
- final var nodes = new TestKitNodes.Builder()
- .setClusterId("test-cluster")
- .setNumBrokerNodes(1)
- .setNumControllerNodes(3)
- .build();
-
- final Map<Integer, Uuid> initialVoters = new HashMap<>();
- for (final var controllerNode : nodes.controllerNodes().values()) {
- initialVoters.put(
- controllerNode.id(),
- controllerNode.metadataDirectoryId()
- );
- }
-
- try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
- cluster.format();
- cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
- TestUtils.waitForCondition(() -> {
- Map<Integer, Uuid> voters = descVoterDirs(admin);
- assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
- return voters.values().stream().noneMatch(directory ->
directory.equals(Uuid.ZERO_UUID));
- }, "Initial quorum voters should be {3000, 3001, 3002} and all
should have non-zero directory IDs");
-
- Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
- admin.removeRaftVoter(
- 3000,
- dirId,
- new
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
- ).all().get();
- TestUtils.waitForCondition(() -> {
- Map<Integer, Uuid> voters = descVoterDirs(admin);
- assertEquals(Set.of(3001, 3002), voters.keySet());
- return voters.values().stream().noneMatch(directory ->
directory.equals(Uuid.ZERO_UUID));
- }, "After removing voter 3000, remaining voters should be
{3001, 3002} with non-zero directory IDs");
-
- admin.addRaftVoter(
- 3000,
- dirId,
- Set.of(new RaftVoterEndpoint("CONTROLLER",
"example.com", 8080)),
- new
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
- ).all().get();
- }
- }
- }
-
- @Test
- public void testRemoveAndAddVoterWithInconsistentClusterId() throws
Exception {
- final var nodes = new TestKitNodes.Builder()
- .setClusterId("test-cluster")
- .setNumBrokerNodes(1)
- .setNumControllerNodes(3)
- .build();
-
- final Map<Integer, Uuid> initialVoters = new HashMap<>();
- for (final var controllerNode : nodes.controllerNodes().values()) {
- initialVoters.put(
- controllerNode.id(),
- controllerNode.metadataDirectoryId()
- );
- }
-
- try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
- cluster.format();
- cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
- Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
- var removeFuture = admin.removeRaftVoter(
- 3000,
- dirId,
- new
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
- ).all();
-
TestUtils.assertFutureThrows(InconsistentClusterIdException.class,
removeFuture);
-
- var addFuture = admin.addRaftVoter(
- 3000,
- dirId,
- Set.of(new RaftVoterEndpoint("CONTROLLER",
"example.com", 8080)),
- new
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
- ).all();
-
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
- }
- }
- }
-}
diff --git
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
similarity index 69%
rename from
core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
rename to
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
index c67e941dd7a..ad30708d7c5 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -15,13 +15,16 @@
* limitations under the License.
*/
-package kafka.server;
+package org.apache.kafka.server;
+import org.apache.kafka.clients.admin.AddRaftVoterOptions;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.FeatureMetadata;
import org.apache.kafka.clients.admin.QuorumInfo;
import org.apache.kafka.clients.admin.RaftVoterEndpoint;
+import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.TestKitDefaults;
@@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
@@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
+@Tag("integration")
public class ReconfigurableQuorumIntegrationTest {
static void checkKRaftVersions(Admin admin, short finalized) throws
Exception {
FeatureMetadata featureMetadata =
admin.describeFeatures().featureMetadata().get();
@@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
).build()) {
cluster.format();
cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_0.featureLevel());
});
@@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
).setStandalone(true).build()) {
cluster.format();
cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
checkKRaftVersions(admin,
KRaftVersion.KRAFT_VERSION_1.featureLevel());
});
@@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002, 3003),
voters.keySet());
@@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
) {
cluster.format();
cluster.startup();
- try (Admin admin = Admin.create(cluster.clientProperties())) {
+ try (var admin = Admin.create(cluster.clientProperties())) {
TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
Map<Integer, Uuid> voters = findVoterDirs(admin);
assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
}
}
}
-}
+
+ @Test
+ public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
+ final var nodes = new TestKitNodes.Builder()
+ .setClusterId("test-cluster")
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3000, 3001, 3002}) {
+ assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+ }
+ });
+
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ admin.removeRaftVoter(
+ 3000,
+ dirId,
+ new
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+ ).all().get();
+ TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+ Map<Integer, Uuid> voters = findVoterDirs(admin);
+ assertEquals(Set.of(3001, 3002), voters.keySet());
+ for (int replicaId : new int[] {3001, 3002}) {
+ assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+ }
+ });
+
+ admin.addRaftVoter(
+ 3000,
+ dirId,
+ Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com",
8080)),
+ new
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+ ).all().get();
+ }
+ }
+ }
+
+ @Test
+ public void testRemoveAndAddVoterWithInconsistentClusterId() throws
Exception {
+ final var nodes = new TestKitNodes.Builder()
+ .setClusterId("test-cluster")
+ .setNumBrokerNodes(1)
+ .setNumControllerNodes(3)
+ .build();
+
+ final Map<Integer, Uuid> initialVoters = new HashMap<>();
+ for (final var controllerNode : nodes.controllerNodes().values()) {
+ initialVoters.put(
+ controllerNode.id(),
+ controllerNode.metadataDirectoryId()
+ );
+ }
+
+ try (var cluster = new
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+ cluster.format();
+ cluster.startup();
+ try (var admin = Admin.create(cluster.clientProperties())) {
+ Uuid dirId =
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+ var removeFuture = admin.removeRaftVoter(
+ 3000,
+ dirId,
+ new
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+ ).all();
+
TestUtils.assertFutureThrows(InconsistentClusterIdException.class,
removeFuture);
+
+ var addFuture = admin.addRaftVoter(
+ 3000,
+ dirId,
+ Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com",
8080)),
+ new
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+ ).all();
+
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
+ }
+ }
+ }
+}
\ No newline at end of file