This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c6294aacef5 KAFKA-17721 Enable to configure listener name and protocol
for controller (#17525)
c6294aacef5 is described below
commit c6294aacef5d07b023a688b24bae35eeb3d4fdcb
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Wed Nov 20 23:06:29 2024 +0800
KAFKA-17721 Enable to configure listener name and protocol for controller
(#17525)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/common/test/TestKitNodes.java | 20 ++++++++++--
.../apache/kafka/common/test/TestKitNodeTest.java | 13 ++++++--
.../kafka/common/test/api/ClusterConfig.java | 38 ++++++++++++++++++++--
.../apache/kafka/common/test/api/ClusterTest.java | 5 ++-
.../common/test/api/ClusterTestExtensions.java | 4 ++-
.../org/apache/kafka/common/test/api/README.md | 4 +--
.../test/api/RaftClusterInvocationContext.java | 3 +-
.../kafka/common/test/api/ClusterConfigTest.java | 6 ++++
8 files changed, 80 insertions(+), 13 deletions(-)
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 42a621c9bbb..dfab866d3a6 100644
--- a/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ b/test-common/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -48,6 +48,8 @@ public class TestKitNodes {
public static final int BROKER_ID_OFFSET = 0;
public static final SecurityProtocol DEFAULT_BROKER_SECURITY_PROTOCOL =
SecurityProtocol.PLAINTEXT;
public static final String DEFAULT_BROKER_LISTENER_NAME = "EXTERNAL";
+ public static final SecurityProtocol DEFAULT_CONTROLLER_SECURITY_PROTOCOL
= SecurityProtocol.PLAINTEXT;
+ public static final String DEFAULT_CONTROLLER_LISTENER_NAME = "CONTROLLER";
public static class Builder {
private boolean combined;
@@ -66,10 +68,12 @@ public class TestKitNodes {
public Builder(BootstrapMetadata bootstrapMetadata) {
this.bootstrapMetadata = bootstrapMetadata;
}
- // The brokerListenerName and brokerSecurityProtocol configurations
must
+ // The broker and controller listener name and SecurityProtocol
configurations must
// be kept in sync with the default values in ClusterTest.
private ListenerName brokerListenerName =
ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME);
private SecurityProtocol brokerSecurityProtocol =
DEFAULT_BROKER_SECURITY_PROTOCOL;
+ private ListenerName controllerListenerName =
ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME);
+ private SecurityProtocol controllerSecurityProtocol =
DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
public Builder setClusterId(String clusterId) {
this.clusterId = clusterId;
@@ -134,6 +138,16 @@ public class TestKitNodes {
return this;
}
+ public Builder setControllerListenerName(ListenerName listenerName) {
+ this.controllerListenerName = listenerName;
+ return this;
+ }
+
+ public Builder setControllerSecurityProtocol(SecurityProtocol
securityProtocol) {
+ this.controllerSecurityProtocol = securityProtocol;
+ return this;
+ }
+
public TestKitNodes build() {
if (numControllerNodes < 0) {
throw new IllegalArgumentException("Invalid negative value for
numControllerNodes");
@@ -145,7 +159,7 @@ public class TestKitNodes {
throw new IllegalArgumentException("Invalid value for
numDisksPerBroker");
}
// TODO: remove this assertion after
https://issues.apache.org/jira/browse/KAFKA-16680 is finished
- if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
+ if (brokerSecurityProtocol != SecurityProtocol.PLAINTEXT ||
controllerSecurityProtocol != SecurityProtocol.PLAINTEXT) {
throw new IllegalArgumentException("Currently only support
PLAINTEXT security protocol");
}
if (baseDirectory == null) {
@@ -203,7 +217,7 @@ public class TestKitNodes {
}
return new TestKitNodes(baseDirectory.toFile().getAbsolutePath(),
clusterId, bootstrapMetadata, controllerNodes, brokerNodes,
- brokerListenerName, brokerSecurityProtocol, new
ListenerName("CONTROLLER"), SecurityProtocol.PLAINTEXT);
+ brokerListenerName, brokerSecurityProtocol,
controllerListenerName, controllerSecurityProtocol);
}
}
diff --git
a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
index 1ba7e58abe8..c9adbe3431b 100644
---
a/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
+++
b/test-common/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
@@ -36,18 +36,25 @@ public class TestKitNodeTest {
assertEquals("Currently only support PLAINTEXT security protocol",
assertThrows(IllegalArgumentException.class,
() -> new
TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
+ assertEquals("Currently only support PLAINTEXT security protocol",
+ assertThrows(IllegalArgumentException.class,
+ () -> new
TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
}
}
@Test
public void testListenerName() {
- ListenerName listenerName = ListenerName.normalised("FOOBAR");
+ ListenerName brokerListenerName = ListenerName.normalised("FOOBAR");
+ ListenerName controllerListenerName =
ListenerName.normalised("BAZQUX");
TestKitNodes testKitNodes = new TestKitNodes.Builder()
.setNumBrokerNodes(1)
.setNumControllerNodes(1)
- .setBrokerListenerName(listenerName)
+ .setBrokerListenerName(brokerListenerName)
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
+ .setControllerListenerName(controllerListenerName)
+ .setControllerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.build();
- assertEquals(listenerName, testKitNodes.brokerListenerName());
+ assertEquals(brokerListenerName, testKitNodes.brokerListenerName());
+ assertEquals(controllerListenerName,
testKitNodes.controllerListenerName());
}
}
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
index 4d5e07dfa05..1697a1fe0a6 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterConfig.java
@@ -38,6 +38,8 @@ import java.util.stream.Stream;
import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
+import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
+import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
/**
* Represents an immutable requested configuration of a Kafka cluster for
integration testing.
@@ -51,6 +53,8 @@ public class ClusterConfig {
private final boolean autoStart;
private final SecurityProtocol brokerSecurityProtocol;
private final ListenerName brokerListenerName;
+ private final SecurityProtocol controllerSecurityProtocol;
+ private final ListenerName controllerListenerName;
private final File trustStoreFile;
private final MetadataVersion metadataVersion;
@@ -66,7 +70,8 @@ public class ClusterConfig {
@SuppressWarnings("checkstyle:ParameterNumber")
private ClusterConfig(Set<Type> types, int brokers, int controllers, int
disksPerBroker, boolean autoStart,
- SecurityProtocol brokerSecurityProtocol, ListenerName
brokerListenerName, File trustStoreFile,
+ SecurityProtocol brokerSecurityProtocol, ListenerName
brokerListenerName,
+ SecurityProtocol controllerSecurityProtocol, ListenerName
controllerListenerName, File trustStoreFile,
MetadataVersion metadataVersion, Map<String, String>
serverProperties, Map<String, String> producerProperties,
Map<String, String> consumerProperties, Map<String, String>
adminClientProperties, Map<String, String> saslServerProperties,
Map<String, String> saslClientProperties, Map<Integer,
Map<String, String>> perServerProperties, List<String> tags,
@@ -83,6 +88,8 @@ public class ClusterConfig {
this.autoStart = autoStart;
this.brokerSecurityProtocol =
Objects.requireNonNull(brokerSecurityProtocol);
this.brokerListenerName = Objects.requireNonNull(brokerListenerName);
+ this.controllerSecurityProtocol =
Objects.requireNonNull(controllerSecurityProtocol);
+ this.controllerListenerName =
Objects.requireNonNull(controllerListenerName);
this.trustStoreFile = trustStoreFile;
this.metadataVersion = Objects.requireNonNull(metadataVersion);
this.serverProperties = Objects.requireNonNull(serverProperties);
@@ -144,6 +151,14 @@ public class ClusterConfig {
return brokerSecurityProtocol;
}
+ public ListenerName controllerListenerName() {
+ return controllerListenerName;
+ }
+
+ public SecurityProtocol controllerSecurityProtocol() {
+ return controllerSecurityProtocol;
+ }
+
public ListenerName brokerListenerName() {
return brokerListenerName;
}
@@ -173,6 +188,8 @@ public class ClusterConfig {
displayTags.add("MetadataVersion=" + metadataVersion);
displayTags.add("BrokerSecurityProtocol=" +
brokerSecurityProtocol.name());
displayTags.add("BrokerListenerName=" + brokerListenerName);
+ displayTags.add("ControllerSecurityProtocol=" +
controllerSecurityProtocol.name());
+ displayTags.add("ControllerListenerName=" + controllerListenerName);
return displayTags;
}
@@ -185,6 +202,8 @@ public class ClusterConfig {
.setAutoStart(true)
.setBrokerSecurityProtocol(DEFAULT_BROKER_SECURITY_PROTOCOL)
.setBrokerListenerName(ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME))
+
.setControllerSecurityProtocol(DEFAULT_CONTROLLER_SECURITY_PROTOCOL)
+
.setControllerListenerName(ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME))
.setMetadataVersion(MetadataVersion.latestTesting());
}
@@ -201,6 +220,8 @@ public class ClusterConfig {
.setAutoStart(clusterConfig.autoStart)
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol)
.setBrokerListenerName(clusterConfig.brokerListenerName)
+
.setControllerSecurityProtocol(clusterConfig.controllerSecurityProtocol)
+
.setControllerListenerName(clusterConfig.controllerListenerName)
.setTrustStoreFile(clusterConfig.trustStoreFile)
.setMetadataVersion(clusterConfig.metadataVersion)
.setServerProperties(clusterConfig.serverProperties)
@@ -222,6 +243,8 @@ public class ClusterConfig {
private boolean autoStart;
private SecurityProtocol brokerSecurityProtocol;
private ListenerName brokerListenerName;
+ private SecurityProtocol controllerSecurityProtocol;
+ private ListenerName controllerListenerName;
private File trustStoreFile;
private MetadataVersion metadataVersion;
private Map<String, String> serverProperties = Collections.emptyMap();
@@ -271,6 +294,16 @@ public class ClusterConfig {
return this;
}
+ public Builder setControllerSecurityProtocol(SecurityProtocol
securityProtocol) {
+ this.controllerSecurityProtocol = securityProtocol;
+ return this;
+ }
+
+ public Builder setControllerListenerName(ListenerName listenerName) {
+ this.controllerListenerName = listenerName;
+ return this;
+ }
+
public Builder setTrustStoreFile(File trustStoreFile) {
this.trustStoreFile = trustStoreFile;
return this;
@@ -329,7 +362,8 @@ public class ClusterConfig {
}
public ClusterConfig build() {
- return new ClusterConfig(types, brokers, controllers,
disksPerBroker, autoStart, brokerSecurityProtocol, brokerListenerName,
+ return new ClusterConfig(types, brokers, controllers,
disksPerBroker, autoStart,
+ brokerSecurityProtocol, brokerListenerName,
controllerSecurityProtocol, controllerListenerName,
trustStoreFile, metadataVersion, serverProperties,
producerProperties, consumerProperties,
adminClientProperties, saslServerProperties,
saslClientProperties,
perServerProperties, tags, features);
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
index 574ae85abb3..86aba1030d8 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTest.java
@@ -31,6 +31,7 @@ import java.lang.annotation.Target;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.RetentionPolicy.RUNTIME;
import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
+import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
@Documented
@Target({METHOD})
@@ -44,11 +45,13 @@ public @interface ClusterTest {
int controllers() default 0;
int disksPerBroker() default 0;
AutoStart autoStart() default AutoStart.DEFAULT;
- // The brokerListenerName and brokerSecurityProtocol configurations must
+ // The broker/controller listener name and SecurityProtocol configurations
must
// be kept in sync with the default values in TestKitNodes, as many tests
// directly use TestKitNodes without relying on the ClusterTest annotation.
SecurityProtocol brokerSecurityProtocol() default
SecurityProtocol.PLAINTEXT;
String brokerListener() default DEFAULT_BROKER_LISTENER_NAME;
+ SecurityProtocol controllerSecurityProtocol() default
SecurityProtocol.PLAINTEXT;
+ String controllerListener() default DEFAULT_CONTROLLER_LISTENER_NAME;
MetadataVersion metadataVersion() default MetadataVersion.IBP_4_0_IV3;
ClusterConfigProperty[] serverProperties() default {};
// users can add tags that they want to display in test
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
index 827fb0cf67c..180acd96306 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/ClusterTestExtensions.java
@@ -250,9 +250,11 @@ public class ClusterTestExtensions implements
TestTemplateInvocationContextProvi
.setDisksPerBroker(clusterTest.disksPerBroker() == 0 ?
defaults.disksPerBroker() : clusterTest.disksPerBroker())
.setAutoStart(clusterTest.autoStart() == AutoStart.DEFAULT ?
defaults.autoStart() : clusterTest.autoStart() == AutoStart.YES)
.setBrokerListenerName(ListenerName.normalised(clusterTest.brokerListener()))
+ .setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
+
.setControllerListenerName(ListenerName.normalised(clusterTest.controllerListener()))
+
.setControllerSecurityProtocol(clusterTest.controllerSecurityProtocol())
.setServerProperties(serverProperties)
.setPerServerProperties(perServerProperties)
- .setBrokerSecurityProtocol(clusterTest.brokerSecurityProtocol())
.setMetadataVersion(clusterTest.metadataVersion())
.setTags(Arrays.asList(clusterTest.tags()))
.setFeatures(features)
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
index a0e815f9385..7a3ea14dc66 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/README.md
@@ -15,8 +15,8 @@ Arbitrary server properties can also be provided in the
annotation:
```java
@ClusterTest(
- types = {Type.KRAFT},
- brokerSecurityProtocol = SecurityProtocol.PLAINTEXT,
+ types = {Type.KRAFT},
+ brokerSecurityProtocol = SecurityProtocol.PLAINTEXT,
properties = {
@ClusterProperty(key = "inter.broker.protocol.version", value = "2.7-IV2"),
@ClusterProperty(key = "socket.send.buffer.bytes", value = "10240"),
diff --git
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
index 1fe87523b4f..de491c7f719 100644
---
a/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
+++
b/test-common/test-common-api/src/main/java/org/apache/kafka/common/test/api/RaftClusterInvocationContext.java
@@ -284,11 +284,12 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
.setNumControllerNodes(clusterConfig.numControllers())
.setBrokerListenerName(listenerName)
.setBrokerSecurityProtocol(clusterConfig.brokerSecurityProtocol())
+
.setControllerListenerName(clusterConfig.controllerListenerName())
+
.setControllerSecurityProtocol(clusterConfig.controllerSecurityProtocol())
.build();
KafkaClusterTestKit.Builder builder = new
KafkaClusterTestKit.Builder(nodes);
// Copy properties into the TestKit builder
clusterConfig.serverProperties().forEach(builder::setConfigProp);
- // KAFKA-12512 need to pass security protocol and listener
name here
this.clusterTestKit = builder.build();
this.clusterTestKit.format();
}
diff --git
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
index 4e5e2e6b2ce..0f3204b1482 100644
---
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
+++
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterConfigTest.java
@@ -37,6 +37,8 @@ import java.util.stream.Collectors;
import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_LISTENER_NAME;
import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_BROKER_SECURITY_PROTOCOL;
+import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_LISTENER_NAME;
+import static
org.apache.kafka.common.test.TestKitNodes.DEFAULT_CONTROLLER_SECURITY_PROTOCOL;
public class ClusterConfigTest {
@@ -60,6 +62,8 @@ public class ClusterConfigTest {
.setTags(Arrays.asList("name", "Generated Test"))
.setBrokerSecurityProtocol(SecurityProtocol.PLAINTEXT)
.setBrokerListenerName(ListenerName.normalised("EXTERNAL"))
+ .setControllerSecurityProtocol(SecurityProtocol.SASL_PLAINTEXT)
+
.setControllerListenerName(ListenerName.normalised("CONTROLLER"))
.setTrustStoreFile(trustStoreFile)
.setMetadataVersion(MetadataVersion.IBP_0_8_0)
.setServerProperties(Collections.singletonMap("broker",
"broker_value"))
@@ -116,5 +120,7 @@ public class ClusterConfigTest {
Assertions.assertTrue(expectedDisplayTags.contains("MetadataVersion="
+ MetadataVersion.latestTesting()));
Assertions.assertTrue(expectedDisplayTags.contains("BrokerSecurityProtocol=" +
DEFAULT_BROKER_SECURITY_PROTOCOL));
Assertions.assertTrue(expectedDisplayTags.contains("BrokerListenerName=" +
ListenerName.normalised(DEFAULT_BROKER_LISTENER_NAME)));
+
Assertions.assertTrue(expectedDisplayTags.contains("ControllerSecurityProtocol="
+ DEFAULT_CONTROLLER_SECURITY_PROTOCOL));
+
Assertions.assertTrue(expectedDisplayTags.contains("ControllerListenerName=" +
ListenerName.normalised(DEFAULT_CONTROLLER_LISTENER_NAME)));
}
}