This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4a5aa371697 MINOR: Move ReconfigurableQuorumIntegrationTest from core 
module to server module  (#20636)
4a5aa371697 is described below

commit 4a5aa3716978a766827521af1942ec54d6636324
Author: Chang-Chi Hsu <[email protected]>
AuthorDate: Tue Oct 7 19:10:58 2025 +0200

    MINOR: Move ReconfigurableQuorumIntegrationTest from core module to server 
module  (#20636)
    
    It moves the `ReconfigurableQuorumIntegrationTest` class to the
    `org.apache.kafka.server` package and consolidates two related tests,
    `RemoveAndAddVoterWithValidClusterId` and
    `RemoveAndAddVoterWithInconsistentClusterId`, into a single file. This
    improves code organization and reduces redundancy.
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../admin/ReconfigurableQuorumIntegrationTest.java | 132 ---------------------
 .../ReconfigurableQuorumIntegrationTest.java       | 113 ++++++++++++++++--
 2 files changed, 105 insertions(+), 140 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
deleted file mode 100644
index f02db36c061..00000000000
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/admin/ReconfigurableQuorumIntegrationTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.errors.InconsistentClusterIdException;
-import org.apache.kafka.common.test.KafkaClusterTestKit;
-import org.apache.kafka.common.test.TestKitNodes;
-import org.apache.kafka.test.TestUtils;
-
-import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-@Tag("integration")
-public class ReconfigurableQuorumIntegrationTest {
-
-    static Map<Integer, Uuid> descVoterDirs(Admin admin) throws 
ExecutionException, InterruptedException {
-        var quorumInfo = admin.describeMetadataQuorum().quorumInfo().get();
-        return 
quorumInfo.voters().stream().collect(Collectors.toMap(QuorumInfo.ReplicaState::replicaId,
 QuorumInfo.ReplicaState::replicaDirectoryId));
-    }
-
-    @Test
-    public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
-        final var nodes = new TestKitNodes.Builder()
-                .setClusterId("test-cluster")
-                .setNumBrokerNodes(1)
-                .setNumControllerNodes(3)
-                .build();
-
-        final Map<Integer, Uuid> initialVoters = new HashMap<>();
-        for (final var controllerNode : nodes.controllerNodes().values()) {
-            initialVoters.put(
-                    controllerNode.id(),
-                    controllerNode.metadataDirectoryId()
-            );
-        }
-
-        try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
-            cluster.format();
-            cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
-                TestUtils.waitForCondition(() -> {
-                    Map<Integer, Uuid> voters = descVoterDirs(admin);
-                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
-                    return voters.values().stream().noneMatch(directory -> 
directory.equals(Uuid.ZERO_UUID));
-                }, "Initial quorum voters should be {3000, 3001, 3002} and all 
should have non-zero directory IDs");
-
-                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
-                admin.removeRaftVoter(
-                        3000,
-                        dirId,
-                        new 
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
-                ).all().get();
-                TestUtils.waitForCondition(() -> {
-                    Map<Integer, Uuid> voters = descVoterDirs(admin);
-                    assertEquals(Set.of(3001, 3002), voters.keySet());
-                    return voters.values().stream().noneMatch(directory -> 
directory.equals(Uuid.ZERO_UUID));
-                }, "After removing voter 3000, remaining voters should be 
{3001, 3002} with non-zero directory IDs");
-
-                admin.addRaftVoter(
-                        3000,
-                        dirId,
-                        Set.of(new RaftVoterEndpoint("CONTROLLER", 
"example.com", 8080)),
-                        new 
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
-                ).all().get();
-            }
-        }
-    }
-
-    @Test
-    public void testRemoveAndAddVoterWithInconsistentClusterId() throws 
Exception {
-        final var nodes = new TestKitNodes.Builder()
-                .setClusterId("test-cluster")
-                .setNumBrokerNodes(1)
-                .setNumControllerNodes(3)
-                .build();
-
-        final Map<Integer, Uuid> initialVoters = new HashMap<>();
-        for (final var controllerNode : nodes.controllerNodes().values()) {
-            initialVoters.put(
-                    controllerNode.id(),
-                    controllerNode.metadataDirectoryId()
-            );
-        }
-
-        try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
-            cluster.format();
-            cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
-                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
-                var removeFuture = admin.removeRaftVoter(
-                        3000,
-                        dirId,
-                        new 
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
-                ).all();
-                
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, 
removeFuture);
-
-                var addFuture = admin.addRaftVoter(
-                        3000,
-                        dirId,
-                        Set.of(new RaftVoterEndpoint("CONTROLLER", 
"example.com", 8080)),
-                        new 
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
-                ).all();
-                
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
-            }
-        }
-    }
-}
diff --git 
a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java 
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
similarity index 69%
rename from 
core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
rename to 
server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
index c67e941dd7a..ad30708d7c5 100644
--- a/core/src/test/java/kafka/server/ReconfigurableQuorumIntegrationTest.java
+++ 
b/server/src/test/java/org/apache/kafka/server/ReconfigurableQuorumIntegrationTest.java
@@ -15,13 +15,16 @@
  * limitations under the License.
  */
 
-package kafka.server;
+package org.apache.kafka.server;
 
+import org.apache.kafka.clients.admin.AddRaftVoterOptions;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.FeatureMetadata;
 import org.apache.kafka.clients.admin.QuorumInfo;
 import org.apache.kafka.clients.admin.RaftVoterEndpoint;
+import org.apache.kafka.clients.admin.RemoveRaftVoterOptions;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.test.KafkaClusterTestKit;
 import org.apache.kafka.common.test.TestKitNodes;
 import org.apache.kafka.common.test.api.TestKitDefaults;
@@ -29,10 +32,12 @@ import org.apache.kafka.raft.QuorumConfig;
 import org.apache.kafka.server.common.KRaftVersion;
 import org.apache.kafka.test.TestUtils;
 
+import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.Test;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.TreeMap;
 
@@ -41,6 +46,7 @@ import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
+@Tag("integration")
 public class ReconfigurableQuorumIntegrationTest {
     static void checkKRaftVersions(Admin admin, short finalized) throws 
Exception {
         FeatureMetadata featureMetadata = 
admin.describeFeatures().featureMetadata().get();
@@ -70,7 +76,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ).build()) {
             cluster.format();
             cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = Admin.create(cluster.clientProperties())) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
                     checkKRaftVersions(admin, 
KRaftVersion.KRAFT_VERSION_0.featureLevel());
                 });
@@ -88,7 +94,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ).setStandalone(true).build()) {
             cluster.format();
             cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = Admin.create(cluster.clientProperties())) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, () -> {
                     checkKRaftVersions(admin, 
KRaftVersion.KRAFT_VERSION_1.featureLevel());
                 });
@@ -126,7 +132,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = Admin.create(cluster.clientProperties())) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -161,7 +167,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = Admin.create(cluster.clientProperties())) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002, 3003), 
voters.keySet());
@@ -200,7 +206,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = Admin.create(cluster.clientProperties())) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -238,7 +244,7 @@ public class ReconfigurableQuorumIntegrationTest {
         ) {
             cluster.format();
             cluster.startup();
-            try (Admin admin = Admin.create(cluster.clientProperties())) {
+            try (var admin = Admin.create(cluster.clientProperties())) {
                 TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
                     Map<Integer, Uuid> voters = findVoterDirs(admin);
                     assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
@@ -249,4 +255,95 @@ public class ReconfigurableQuorumIntegrationTest {
             }
         }
     }
-}
+
+    @Test
+    public void testRemoveAndAddVoterWithValidClusterId() throws Exception {
+        final var nodes = new TestKitNodes.Builder()
+            .setClusterId("test-cluster")
+            .setNumBrokerNodes(1)
+            .setNumControllerNodes(3)
+            .build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3000, 3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3000, 3001, 3002}) {
+                        assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+                    }
+                });
+
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+                admin.removeRaftVoter(
+                    3000,
+                    dirId,
+                    new 
RemoveRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+                ).all().get();
+                TestUtils.retryOnExceptionWithTimeout(30_000, 10, () -> {
+                    Map<Integer, Uuid> voters = findVoterDirs(admin);
+                    assertEquals(Set.of(3001, 3002), voters.keySet());
+                    for (int replicaId : new int[] {3001, 3002}) {
+                        assertNotEquals(Uuid.ZERO_UUID, voters.get(replicaId));
+                    }
+                });
+
+                admin.addRaftVoter(
+                    3000,
+                    dirId,
+                    Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 
8080)),
+                    new 
AddRaftVoterOptions().setClusterId(Optional.of("test-cluster"))
+                ).all().get();
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveAndAddVoterWithInconsistentClusterId() throws 
Exception {
+        final var nodes = new TestKitNodes.Builder()
+            .setClusterId("test-cluster")
+            .setNumBrokerNodes(1)
+            .setNumControllerNodes(3)
+            .build();
+
+        final Map<Integer, Uuid> initialVoters = new HashMap<>();
+        for (final var controllerNode : nodes.controllerNodes().values()) {
+            initialVoters.put(
+                controllerNode.id(),
+                controllerNode.metadataDirectoryId()
+            );
+        }
+
+        try (var cluster = new 
KafkaClusterTestKit.Builder(nodes).setInitialVoterSet(initialVoters).build()) {
+            cluster.format();
+            cluster.startup();
+            try (var admin = Admin.create(cluster.clientProperties())) {
+                Uuid dirId = 
cluster.nodes().controllerNodes().get(3000).metadataDirectoryId();
+                var removeFuture = admin.removeRaftVoter(
+                    3000,
+                    dirId,
+                    new 
RemoveRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+                ).all();
+                
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, 
removeFuture);
+
+                var addFuture = admin.addRaftVoter(
+                    3000,
+                    dirId,
+                    Set.of(new RaftVoterEndpoint("CONTROLLER", "example.com", 
8080)),
+                    new 
AddRaftVoterOptions().setClusterId(Optional.of("inconsistent"))
+                ).all();
+                
TestUtils.assertFutureThrows(InconsistentClusterIdException.class, addFuture);
+            }
+        }
+    }
+}
\ No newline at end of file

Reply via email to