This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 55577e73b2a KAFKA-18083 ClusterInstance custom controllerListener not
work (#17932)
55577e73b2a is described below
commit 55577e73b2a30a7f47cb2de3c22973020d61e230
Author: Kuan-Po Tseng <[email protected]>
AuthorDate: Tue Nov 26 22:01:21 2024 +0800
KAFKA-18083 ClusterInstance custom controllerListener not work (#17932)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../org/apache/kafka/common/test/KafkaClusterTestKit.java | 11 ++++++-----
.../kafka/common/test/api/ClusterTestExtensionsTest.java | 8 ++++++++
2 files changed, 14 insertions(+), 5 deletions(-)
diff --git
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index 9d0c4daa5e7..ee1bf34a5d1 100644
---
a/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -111,7 +111,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final String brokerSecurityProtocol;
private final String controllerSecurityProtocol;
-
public Builder(TestKitNodes nodes) {
this.nodes = nodes;
this.brokerListenerName = nodes.brokerListenerName().value();
@@ -169,7 +168,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
append("@").
append("localhost").
append(":").
-
append(socketFactoryManager.getOrCreatePortForListener(nodeId, "CONTROLLER"));
+
append(socketFactoryManager.getOrCreatePortForListener(nodeId,
controllerListenerName));
prefix = ",";
}
props.put(QuorumConfig.QUORUM_VOTERS_CONFIG,
quorumVoterStringBuilder.toString());
@@ -199,7 +198,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
- socketFactoryManager.getOrCreatePortForListener(node.id(),
"CONTROLLER");
+ socketFactoryManager.getOrCreatePortForListener(node.id(),
controllerListenerName);
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory,
node.metadataDirectory(), Collections.emptyList());
@@ -316,6 +315,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final File baseDirectory;
private final SimpleFaultHandlerFactory faultHandlerFactory;
private final PreboundSocketFactoryManager socketFactoryManager;
+ private final String controllerListenerName;
private KafkaClusterTestKit(
TestKitNodes nodes,
@@ -338,6 +338,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.baseDirectory = baseDirectory;
this.faultHandlerFactory = faultHandlerFactory;
this.socketFactoryManager = socketFactoryManager;
+ this.controllerListenerName = nodes.controllerListenerName().value();
}
public void format() throws Exception {
@@ -389,7 +390,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
nodes.bootstrapMetadata().featureLevel(KRaftVersion.FEATURE_NAME));
formatter.setUnstableFeatureVersionsEnabled(true);
formatter.setIgnoreFormatted(false);
- formatter.setControllerListenerName("CONTROLLER");
+ formatter.setControllerListenerName(controllerListenerName);
if (writeMetadataDirectory) {
formatter.setMetadataLogDirectory(ensemble.metadataLogDir().get());
} else {
@@ -400,7 +401,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
String prefix = "";
for (TestKitNode controllerNode :
nodes.controllerNodes().values()) {
int port = socketFactoryManager.
- getOrCreatePortForListener(controllerNode.id(),
"CONTROLLER");
+ getOrCreatePortForListener(controllerNode.id(),
controllerListenerName);
dynamicVotersBuilder.append(prefix);
prefix = ",";
dynamicVotersBuilder.append(String.format("%d@localhost:%d:%s",
diff --git
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
index b96e8c28c08..bc20aa14273 100644
---
a/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-api/src/test/java/org/apache/kafka/common/test/api/ClusterTestExtensionsTest.java
@@ -322,4 +322,12 @@ public class ClusterTestExtensionsTest {
assertEquals(value, records.get(0).value());
}
}
+
+ @ClusterTest(types = {Type.CO_KRAFT, Type.KRAFT}, controllerListener =
"FOO")
+ public void testControllerListenerName(ClusterInstance cluster) throws
ExecutionException, InterruptedException {
+ assertEquals("FOO", cluster.controllerListenerName().get().value());
+ try (Admin admin = cluster.admin(Map.of(), true)) {
+ assertEquals(1,
admin.describeMetadataQuorum().quorumInfo().get().nodes().size());
+ }
+ }
}