This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c6294aacef5 KAFKA-17721 Enable to configure listener name and protocol 
for controller (#17525)
c6294aacef5 is described below

commit c6294aacef5d07b023a688b24bae35eeb3d4fdcb
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Nov 20 23:06:29 2024 +0800

    KAFKA-17721 Enable to configure listener name and protocol for controller 
(#17525)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/common/test/TestKitNodes.java | 20 ++++++++++--
 .../apache/kafka/common/test/TestKitNodeTest.java  | 13 ++++++--
 .../kafka/common/test/api/ClusterConfig.java       | 38 ++++++++++++++++++++--
 .../apache/kafka/common/test/api/ClusterTest.java  |  5 ++-
 .../common/test/api/ClusterTestExtensions.java     |  4 ++-
 .../org/apache/kafka/common/test/api/README.md     |  4 +--
 .../test/api/RaftClusterInvocationContext.java     |  3 +-
 .../kafka/common/test/api/ClusterConfigTest.java   |  6 ++++
 8 files changed, 80 insertions(+), 13 deletions(-)

diff --git 
a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java 
b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 42a621c9bbb..dfab866d3a6 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -48,6 +48,8 @@ public class TestKitNodes {
     public static final int BROKER_ID_OFFSET = 0;
     public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL = 
SecurityProtocol.PLAINTEXT;
     public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
+    public static final SecurityProtocol DEFAULT_CONTROLLER_SECURITY_PROTOCOL 
= SecurityProtocol.PLAINTEXT;
+    public static final String DEFAULT_CONTROLLER_LISTENER_NAME = "CONTROLLER";
 
     public static class Builder {
         private boolean combined;
@@ -66,10 +68,12 @@ public class TestKitNodes {
         public Builder(BootstrapMetadata bootstrapMetadata) {
             this.bootstrapMetadata = bootstrapMetadata;
         }
-        // The brokerListenerName and brokerSecurityProtocol configurations 
must
+        // The broker and controller listener name and SecurityProtocol 
configurations must
         // be kept in sync with the default values in ClusterTest.
         private ListenerName brokerListenerName = 
ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
         private SecurityProtocol brokerSecurityProtocol = 
DEFAULT_BROKER_SECURITY_PROTOCOL;
+        private ListenerName controllerListenerName = 
ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME);
+        private SecurityProtocol controllerSecurityProtocol = 
DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
 
         public Builder setClusterId(String clusterId) {
             this.clusterId = clusterId;
@@ -134,6 +138,16 @@ public class TestKitNodes {
             return this;
         }
 
+        public Builder setControllerListenerName(ListenerName listenerName) {
+            this.controllerListenerName = listenerName;
+            return this;
+        }
+
+        public Builder setControllerSecurityProtocol(SecurityProtocol 
securityProtocol) {
+            this.controllerSecurityProtocol = securityProtocol;
+            return this;
+        }
+
         public TestKitNodes build() {
             if (numControllerNodes < 0) {
                 throw new IllegalArgumentException("Invalid negative value for 
numControllerNodes");
@@ -145,7 +159,7 @@ public class TestKitNodes {
                 throw new IllegalArgumentException("Invalid value for 
numDisksPerBroker");
             }
             // TODO: remove this assertion after 
https://issues.apache.org/jira/browse/KAFKA-16680 is finished
-            if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
+            if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT || 
controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
                 throw new IllegalArgumentException("Currently only support 
PLAINTEXT security protocol");
             }
             if (baseDirectory == null) {
@@ -203,7 +217,7 @@ public class TestKitNodes {
             }
 
             return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(), 
clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
-                brokerListenerName, brokerSecurityProtocol, new 
ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
+                brokerListenerName, brokerSecurityProtocol, 
controllerListenerName, controllerSecurityProtocol);
         }
     }
 
diff --git 
a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java 
b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
index 1ba7e58abe8..c9adbe3431b 100644
--- 
a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
+++ 
b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
@@ -36,18 +36,25 @@ public class TestKitNodeTest {
             assertEquals("Currently only support PLAINTEXT security protocol",
                     assertThrows(IllegalArgumentException.class,
                             () -> new 
TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
+            assertEquals("Currently only support PLAINTEXT security protocol",
+                assertThrows(IllegalArgumentException.class,
+                    () -> new 
TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
         }
     }
 
     @Test
     public void testListenerName() {
-        ListenerName listenerName = ListenerName.normalised("FOOBAR");
+        ListenerName brokerListenerName = ListenerName.normalised("FOOBAR");
+        ListenerName controllerListenerName = 
ListenerName.normalised("BAZQUX");
         TestKitNodes testKitNodes = new TestKitNodes.Builder()
                 .setNumBrokerNodes(1)
                 .setNumControllerNodes(1)
-                .setBrokerListenerName(listenerName)
+                .setBrokerListenerName(brokerListenerName)
                 .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
+                .setControllerListenerName(controllerListenerName)
+                .setControllerSecurityProtocol(SecurityProtocol.PLAINTEXT)
                 .build();
-        assertEquals(listenerName, testKitNodes.brokerListenerName());
+        assertEquals(brokerListenerName, testKitNodes.brokerListenerName());
+        assertEquals(controllerListenerName, 
testKitNodes.controllerListenerName());
     }
 }
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
index 4d5e07dfa05..1697a1fe0a6 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
@@ -38,6 +38,8 @@ import java.util.stream.Stream;
 
 import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
 import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
+import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
+import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
 
 /**
  * Represents an immutable requested configuration of a Kafka cluster for 
integration testing.
@@ -51,6 +53,8 @@ public class ClusterConfig {
     private final boolean autoStart;
     private final SecurityProtocol brokerSecurityProtocol;
     private final ListenerName brokerListenerName;
+    private final SecurityProtocol controllerSecurityProtocol;
+    private final ListenerName controllerListenerName;
     private final File trustStoreFile;
     private final MetadataVersion metadataVersion;
 
@@ -66,7 +70,8 @@ public class ClusterConfig {
 
     @SuppressWarnings("checkstyle:ParameterNumber")
     private ClusterConfig(Set<Type> types, int brokers, int controllers, int 
disksPerBroker, boolean autoStart,
-                  SecurityProtocol brokerSecurityProtocol, ListenerName 
brokerListenerName, File trustStoreFile,
+                  SecurityProtocol brokerSecurityProtocol, ListenerName 
brokerListenerName,
+                  SecurityProtocol controllerSecurityProtocol, ListenerName 
controllerListenerName, File trustStoreFile,
                   MetadataVersion metadataVersion, Map<String, String> 
serverProperties, Map<String, String> producerProperties,
                   Map<String, String> consumerProperties, Map<String, String> 
adminClientProperties, Map<String, String> saslServerProperties,
                   Map<String, String> saslClientProperties, Map<Integer, 
Map<String, String>> perServerProperties, List<String> tags,
@@ -83,6 +88,8 @@ public class ClusterConfig {
         this.autoStart = autoStart;
         this.brokerSecurityProtocol = 
Objects.requireNonNull(brokerSecurityProtocol);
         this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
+        this.controllerSecurityProtocol = 
Objects.requireNonNull(controllerSecurityProtocol);
+        this.controllerListenerName = 
Objects.requireNonNull(controllerListenerName);
         this.trustStoreFile = trustStoreFile;
         this.metadataVersion = Objects.requireNonNull(metadataVersion);
         this.serverProperties = Objects.requireNonNull(serverProperties);
@@ -144,6 +151,14 @@ public class ClusterConfig {
         return brokerSecurityProtocol;
     }
 
+    public ListenerName controllerListenerName() {
+        return controllerListenerName;
+    }
+
+    public SecurityProtocol controllerSecurityProtocol() {
+        return controllerSecurityProtocol;
+    }
+
     public ListenerName brokerListenerName() {
         return brokerListenerName;
     }
@@ -173,6 +188,8 @@ public class ClusterConfig {
         displayTags.add("MetadataVersion=" + metadataVersion);
         displayTags.add("BrokerSecurityProtocol=" + 
brokerSecurityProtocol.name());
         displayTags.add("BrokerListenerName=" + brokerListenerName);
+        displayTags.add("ControllerSecurityProtocol=" + 
controllerSecurityProtocol.name());
+        displayTags.add("ControllerListenerName=" + controllerListenerName);
         return displayTags;
     }
 
@@ -185,6 +202,8 @@ public class ClusterConfig {
                 .setAutoStart(true)
                 .setBrokerSecurityProtocol(DEFAULT_BROKER_SECURITY_PROTOCOL)
                 
.setBrokerListenerName(ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME))
+                
.setControllerSecurityProtocol(DEFAULT_CONTROLLER_SECURITY_PROTOCOL)
+                
.setControllerListenerName(ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME))
                 .setMetadataVersion(MetadataVersion.latestTesting());
     }
 
@@ -201,6 +220,8 @@ public class ClusterConfig {
                 .setAutoStart(clusterConfig.autoStart)
                 
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol)
                 .setBrokerListenerName(clusterConfig.brokerListenerName)
+                
.setControllerSecurityProtocol(clusterConfig.controllerSecurityProtocol)
+                
.setControllerListenerName(clusterConfig.controllerListenerName)
                 .setTrustStoreFile(clusterConfig.trustStoreFile)
                 .setMetadataVersion(clusterConfig.metadataVersion)
                 .setServerProperties(clusterConfig.serverProperties)
@@ -222,6 +243,8 @@ public class ClusterConfig {
         private boolean autoStart;
         private SecurityProtocol brokerSecurityProtocol;
         private ListenerName brokerListenerName;
+        private SecurityProtocol controllerSecurityProtocol;
+        private ListenerName controllerListenerName;
         private File trustStoreFile;
         private MetadataVersion metadataVersion;
         private Map<String, String> serverProperties = Collections.emptyMap();
@@ -271,6 +294,16 @@ public class ClusterConfig {
             return this;
         }
 
+        public Builder setControllerSecurityProtocol(SecurityProtocol 
securityProtocol) {
+            this.controllerSecurityProtocol = securityProtocol;
+            return this;
+        }
+
+        public Builder setControllerListenerName(ListenerName listenerName) {
+            this.controllerListenerName = listenerName;
+            return this;
+        }
+
         public Builder setTrustStoreFile(File trustStoreFile) {
             this.trustStoreFile = trustStoreFile;
             return this;
@@ -329,7 +362,8 @@ public class ClusterConfig {
         }
 
         public ClusterConfig build() {
-            return new ClusterConfig(types, brokers, controllers, 
disksPerBroker, autoStart, brokerSecurityProtocol, brokerListenerName,
+            return new ClusterConfig(types, brokers, controllers, 
disksPerBroker, autoStart,
+                    brokerSecurityProtocol, brokerListenerName, 
controllerSecurityProtocol, controllerListenerName,
                     trustStoreFile, metadataVersion, serverProperties, 
producerProperties, consumerProperties,
                     adminClientProperties, saslServerProperties, 
saslClientProperties,
                     perServerProperties, tags, features);
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index 574ae85abb3..86aba1030d8 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -31,6 +31,7 @@ import java.lang.annotation.Target;
 import static java.lang.annotation.ElementType.METHOD;
 import static java.lang.annotation.RetentionPolicy.RUNTIME;
 import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
+import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
 
 @Documented
 @Target({METHOD})
@@ -44,11 +45,13 @@ public @interface ClusterTest {
     int controllers() default 0;
     int disksPerBroker() default 0;
     AutoStart autoStart() default AutoStart.DEFAULT;
-    // The brokerListenerName and brokerSecurityProtocol configurations must
+    // The broker/controller listener name and SecurityProtocol configurations 
must
     // be kept in sync with the default values in TestKitNodes, as many tests
     // directly use TestKitNodes without relying on the ClusterTest annotation.
     SecurityProtocol brokerSecurityProtocol() default 
SecurityProtocol.PLAINTEXT;
     String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
+    SecurityProtocol controllerSecurityProtocol() default 
SecurityProtocol.PLAINTEXT;
+    String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
     MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV3;
     ClusterConfigProperty[] serverProperties() default {};
     // users can add tags that they want to display in test
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
index 827fb0cf67c..180acd96306 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
@@ -250,9 +250,11 @@ public class ClusterTestExtensions implements 
TestTemplateInvocationContextProvi
             .setDisksPerBroker(clusterTest.disksPerBroker() == 0 ? 
defaults.disksPerBroker() : clusterTest.disksPerBroker())
             .setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ? 
defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
             
.setBrokerListenerName(ListenerName.normalised(clusterTest.brokerListener()))
+            .setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
+            
.setControllerListenerName(ListenerName.normalised(clusterTest.controllerListener()))
+            
.setControllerSecurityProtocol(clusterTest.controllerSecurityProtocol())
             .setServerProperties(serverProperties)
             .setPerServerProperties(perServerProperties)
-            .setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
             .setMetadataVersion(clusterTest.metadataVersion())
             .setTags(Arrays.asList(clusterTest.tags()))
             .setFeatures(features)
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
index a0e815f9385..7a3ea14dc66 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
@@ -15,8 +15,8 @@ Arbitrary server properties can also be provided in the 
annotation:
 
 ```java
 @ClusterTest(
-  types = {Type.KRAFT}, 
-  brokerSecurityProtocol = SecurityProtocol.PLAINTEXT, 
+  types = {Type.KRAFT},
+  brokerSecurityProtocol = SecurityProtocol.PLAINTEXT,
   properties = {
     @ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
     @ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
diff --git 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
index 1fe87523b4f..de491c7f719 100644
--- 
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
+++ 
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
@@ -284,11 +284,12 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
                         .setNumControllerNodes(clusterConfig.numControllers())
                         .setBrokerListenerName(listenerName)
                         
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol())
+                        
.setControllerListenerName(clusterConfig.controllerListenerName())
+                        
.setControllerSecurityProtocol(clusterConfig.controllerSecurityProtocol())
                         .build();
                 KafkaClusterTestKit.Builder builder = new 
KafkaClusterTestKit.Builder(nodes);
                 // Copy properties into the TestKit builder
                 
clusterConfig.serverProperties().forEach(builder::setConfigProp);
-                // KAFKA-12512 need to pass security protocol and listener 
name here
                 this.clusterTestKit = builder.build();
                 this.clusterTestKit.format();
             }
diff --git 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
index 4e5e2e6b2ce..0f3204b1482 100644
--- 
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
+++ 
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
@@ -37,6 +37,8 @@ import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
 import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
+import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
+import static 
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
 
 public class ClusterConfigTest {
 
@@ -60,6 +62,8 @@ public class ClusterConfigTest {
                 .setTags(Arrays.asList("name", "Generated Test"))
                 .setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
                 .setBrokerListenerName(ListenerName.normalised("EXTERNAL"))
+                .setControllerSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
+                
.setControllerListenerName(ListenerName.normalised("CONTROLLER"))
                 .setTrustStoreFile(trustStoreFile)
                 .setMetadataVersion(MetadataVersion.IBP_0_8_0)
                 .setServerProperties(Collections.singletonMap("broker", 
"broker_value"))
@@ -116,5 +120,7 @@ public class ClusterConfigTest {
         Assertions.assertTrue(expectedDisplayTags.contains("MetadataVersion=" 
+ MetadataVersion.latestTesting()));
         
Assertions.assertTrue(expectedDisplayTags.contains("BrokerSecurityProtocol=" + 
DEFAULT_BROKER_SECURITY_PROTOCOL));
         
Assertions.assertTrue(expectedDisplayTags.contains("BrokerListenerName=" + 
ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME)));
+        
Assertions.assertTrue(expectedDisplayTags.contains("ControllerSecurityProtocol="
 + DEFAULT_CONTROLLER_SECURITY_PROTOCOL));
+        
Assertions.assertTrue(expectedDisplayTags.contains("ControllerListenerName=" + 
ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME)));
     }
 }

Reply via email to