This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55577e73b2a KAFKA-18083 ClusterInstance custom controllerListener not 
work (#17932)
55577e73b2a is described below

commit 55577e73b2a30a7f47cb2de3c22973020d61e230
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Nov 26 22:01:21 2024 +0800

    KAFKA-18083 ClusterInstance custom controllerListener not work (#17932)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/common/test/KafkaClusterTestKit.java     | 11 ++++++-----
 .../kafka/common/test/api/ClusterTestExtensionsTest.java      |  8 ++++++++
 2 files changed, 14 insertions(+), 5 deletions(-)

diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 9d0c4daa5e7..ee1bf34a5d1 100644
--- 
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -111,7 +111,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
         private final String brokerSecurityProtocol;
         private final String controllerSecurityProtocol;
 
-
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
             this.brokerListenerName = nodes.brokerListenerName().value();
@@ -169,7 +168,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     append("@").
                     append("localhost").
                     append(":").
-                    
append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
+                    
append(socketFactoryManager.getOrCreatePortForListener(nodeId, 
controllerListenerName));
                 prefix = ",";
             }
             props.put(QuorumConfig.QUORUM_VOTERS_CONFIG, 
quorumVoterStringBuilder.toString());
@@ -199,7 +198,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             try {
                 baseDirectory = new File(nodes.baseDirectory());
                 for (TestKitNode node : nodes.controllerNodes().values()) {
-                    socketFactoryManager.getOrCreatePortForListener(node.id(), 
"CONTROLLER");
+                    socketFactoryManager.getOrCreatePortForListener(node.id(), 
controllerListenerName);
                 }
                 for (TestKitNode node : nodes.controllerNodes().values()) {
                     setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), Collections.emptyList());
@@ -316,6 +315,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final File baseDirectory;
     private final SimpleFaultHandlerFactory faultHandlerFactory;
     private final PreboundSocketFactoryManager socketFactoryManager;
+    private final String controllerListenerName;
 
     private KafkaClusterTestKit(
         TestKitNodes nodes,
@@ -338,6 +338,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.baseDirectory = baseDirectory;
         this.faultHandlerFactory = faultHandlerFactory;
         this.socketFactoryManager = socketFactoryManager;
+        this.controllerListenerName = nodes.controllerListenerName().value();
     }
 
     public void format() throws Exception {
@@ -389,7 +390,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
             formatter.setUnstableFeatureVersionsEnabled(true);
             formatter.setIgnoreFormatted(false);
-            formatter.setControllerListenerName("CONTROLLER");
+            formatter.setControllerListenerName(controllerListenerName);
             if (writeMetadataDirectory) {
                 
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
             } else {
@@ -400,7 +401,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 String prefix = "";
                 for (TestKitNode controllerNode : 
nodes.controllerNodes().values()) {
                     int port = socketFactoryManager.
-                        getOrCreatePortForListener(controllerNode.id(), 
"CONTROLLER");
+                        getOrCreatePortForListener(controllerNode.id(), 
controllerListenerName);
                     dynamicVotersBuilder.append(prefix);
                     prefix = ",";
                     
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
diff --git 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index b96e8c28c08..bc20aa14273 100644
--- 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -322,4 +322,12 @@ public class ClusterTestExtensionsTest {
             assertEquals(value, records.get(0).value());
         }
     }
+
+    @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener = 
"FOO")
+    public void testControllerListenerName(ClusterInstance cluster) throws 
ExecutionException, InterruptedException {
+        assertEquals("FOO", cluster.controllerListenerName().get().value());
+        try (Admin admin = cluster.admin(Map.of(), true)) {
+            assertEquals(1, 
admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
+        }
+    }
 }

Reply via email to