This is an automated email from the ASF dual-hosted git repository.
soarez pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6a1c8b465b4 KAFKA-18447 Support SASL_SSL protocol with
java.security.auth.login.config (#20376)
6a1c8b465b4 is described below
commit 6a1c8b465b4dfda4b91897ab8b5908da6b469c91
Author: Ken Huang <[email protected]>
AuthorDate: Sat May 9 04:17:26 2026 +0800
KAFKA-18447 Support SASL_SSL protocol with java.security.auth.login.config
(#20376)
The new test framework does not yet support the SASL_SSL security
protocol; we should add support for it.
Reviewers: Igor Soarez <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
build.gradle | 1 +
.../apache/kafka/common/test/ClusterInstance.java | 10 +--
.../org/apache/kafka/common/test/JaasUtils.java | 2 +-
.../kafka/common/test/KafkaClusterTestKit.java | 42 +++++++++---
.../org/apache/kafka/common/test/SslManager.java | 77 ++++++++++++++++++++++
.../org/apache/kafka/common/test/TestKitNodes.java | 6 +-
.../org/apache/kafka/common/test/TestUtils.java | 10 ---
.../test/junit/RaftClusterInvocationContext.java | 27 ++++++++
.../apache/kafka/common/test/TestKitNodeTest.java | 6 +-
.../test/junit/ClusterTestExtensionsTest.java | 32 +++++++++
10 files changed, 183 insertions(+), 30 deletions(-)
diff --git a/build.gradle b/build.gradle
index be1547d4d51..0f90b9f2298 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1666,6 +1666,7 @@ project(':test-common:test-common-runtime') {
implementation libs.junitJupiter
implementation libs.jacksonDataformatYaml
implementation libs.slf4jApi
+ implementation testFixtures(project(':clients'))
testImplementation libs.junitJupiter
testImplementation libs.mockitoCore
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 0fda41dfc9d..f41fee0ff4f 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -150,7 +150,7 @@ public interface ClusterInstance {
props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class.getName());
props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
- return new KafkaProducer<>(setClientSaslConfig(props));
+ return new
KafkaProducer<>(setClientSaslConfig(setClientSslConfig(props)));
}
default <K, V> Producer<K, V> producer() {
@@ -164,7 +164,7 @@ public interface ClusterInstance {
props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" +
TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
- return new KafkaConsumer<>(setClientSaslConfig(props));
+ return new
KafkaConsumer<>(setClientSaslConfig(setClientSslConfig(props)));
}
default <K, V> Consumer<K, V> consumer() {
@@ -189,7 +189,7 @@ public interface ClusterInstance {
}
props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" +
TestUtils.randomString(5));
props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
- return new KafkaShareConsumer<>(setClientSaslConfig(props),
keyDeserializer, valueDeserializer);
+ return new
KafkaShareConsumer<>(setClientSaslConfig(setClientSslConfig(props)),
keyDeserializer, valueDeserializer);
}
default Admin admin(Map<String, Object> configs, boolean
usingBootstrapControllers) {
@@ -201,7 +201,7 @@ public interface ClusterInstance {
props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers());
props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
}
- return Admin.create(setClientSaslConfig(props));
+ return Admin.create(setClientSaslConfig(setClientSslConfig(props)));
}
default Map<String, Object> setClientSaslConfig(Map<String, Object>
configs) {
@@ -220,6 +220,8 @@ public interface ClusterInstance {
return props;
}
+ Map<String, Object> setClientSslConfig(Map<String, Object> configs);
+
default Admin admin(Map<String, Object> configs) {
return admin(configs, false);
}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
index fb319d6980c..2c658c94805 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
@@ -47,7 +47,7 @@ public class JaasUtils {
public static final String KAFKA_PLAIN_ADMIN_PASSWORD =
"plain-admin-secret";
public static File writeJaasContextsToFile(Set<JaasSection> jaasSections)
throws IOException {
- File jaasFile = TestUtils.tempFile();
+ File jaasFile = org.apache.kafka.test.TestUtils.tempFile();
try (FileOutputStream fileStream = new FileOutputStream(jaasFile);
OutputStreamWriter writer = new OutputStreamWriter(fileStream,
StandardCharsets.UTF_8)) {
writer.write(String.join("",
jaasSections.stream().map(Object::toString).toArray(String[]::new)));
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index b152a3b1dbe..fa498e190ea 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -143,7 +143,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
return this;
}
- private KafkaConfig createNodeConfig(TestKitNode node) throws
IOException {
+ private KafkaConfig createNodeConfig(TestKitNode node, Map<String,
Object> sslConfig) throws IOException {
TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
TestKitNode controllerNode =
nodes.controllerNodes().get(node.id());
@@ -151,13 +151,13 @@ public class KafkaClusterTestKit implements AutoCloseable
{
props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
Long.toString(TimeUnit.MINUTES.toMillis(10)));
props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id()));
- props.put(KRaftConfigs.NODE_ID_CONFIG,
- Integer.toString(node.id()));
+ props.put(KRaftConfigs.NODE_ID_CONFIG,
Integer.toString(node.id()));
+
// In combined mode, always prefer the metadata log directory of
the controller node.
if (controllerNode != null) {
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
controllerNode.metadataDirectory());
- setSecurityProtocolProps(props, controllerSecurityProtocol);
+ setSecurityProtocolProps(props, controllerSecurityProtocol,
sslConfig);
} else {
props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
node.metadataDirectory());
@@ -166,7 +166,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
// Set the log.dirs according to the broker node setting (if
there is a broker node)
props.put(LOG_DIRS_CONFIG,
String.join(",", brokerNode.logDataDirectories()));
- setSecurityProtocolProps(props, brokerSecurityProtocol);
+ setSecurityProtocolProps(props, brokerSecurityProtocol,
sslConfig);
} else {
// Set log.dirs equal to the metadata directory if there is
just a controller.
props.put(LOG_DIRS_CONFIG,
@@ -228,7 +228,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
return new KafkaConfig(props, false);
}
- private void setSecurityProtocolProps(Map<String, Object> props,
String securityProtocol) {
+ private void setSecurityProtocolProps(
+ Map<String, Object> props,
+ String securityProtocol,
+ Map<String, Object> sslConfig
+ ) {
if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name))
{
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
"PLAIN");
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
"PLAIN");
@@ -236,11 +240,20 @@ public class KafkaClusterTestKit implements AutoCloseable
{
props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
StandardAuthorizer.class.getName());
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG,
"false");
props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG,
"User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
+ } else if
(securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
+
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
"PLAIN");
+
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
"PLAIN");
+
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG,
"PLAIN");
+ props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG,
StandardAuthorizer.class.getName());
+
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG,
"false");
+ props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG,
"User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
+ sslConfig.forEach(props::putIfAbsent);
}
}
private Optional<File> maybeSetupJaasFile() throws Exception {
- if
(brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
+ if
(brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name) ||
+
brokerSecurityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
File file = JaasUtils.writeJaasContextsToFile(Set.of(
new
JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
List.of(
@@ -272,6 +285,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
Map<Integer, SharedServer> jointServers = new HashMap<>();
File baseDirectory = null;
Optional<File> jaasFile = maybeSetupJaasFile();
+ SslManager sslManager = new SslManager();
+ Map<String, Object> sslConfig = sslManager.createSslConfig();
try {
baseDirectory = new File(nodes.baseDirectory());
for (TestKitNode node : nodes.controllerNodes().values()) {
@@ -282,7 +297,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
for (TestKitNode node : nodes.controllerNodes().values()) {
setupNodeDirectories(baseDirectory,
node.metadataDirectory(), List.of());
- KafkaConfig config = createNodeConfig(node);
+ KafkaConfig config = createNodeConfig(node, sslConfig);
SharedServer sharedServer = new SharedServer(
config,
node.initialMetaPropertiesEnsemble(),
@@ -310,7 +325,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
for (TestKitNode node : nodes.brokerNodes().values()) {
SharedServer sharedServer = jointServers.get(node.id());
if (sharedServer == null) {
- KafkaConfig config = createNodeConfig(node);
+ KafkaConfig config = createNodeConfig(node, sslConfig);
sharedServer = new SharedServer(
config,
node.initialMetaPropertiesEnsemble(),
@@ -354,6 +369,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
faultHandlerFactory,
socketFactoryManager,
jaasFile,
+ sslManager,
standalone,
initialVoterSet,
deleteOnClose);
@@ -401,6 +417,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
private final PreboundSocketFactoryManager socketFactoryManager;
private final String controllerListenerName;
private final Optional<File> jaasFile;
+ private final SslManager sslManager;
private final boolean standalone;
private final Optional<Map<Integer, Uuid>> initialVoterSet;
private final boolean deleteOnClose;
@@ -413,6 +430,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
SimpleFaultHandlerFactory faultHandlerFactory,
PreboundSocketFactoryManager socketFactoryManager,
Optional<File> jaasFile,
+ SslManager sslManager,
boolean standalone,
Optional<Map<Integer, Uuid>> initialVoterSet,
boolean deleteOnClose
@@ -432,6 +450,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
this.socketFactoryManager = socketFactoryManager;
this.controllerListenerName = nodes.controllerListenerName().value();
this.jaasFile = jaasFile;
+ this.sslManager = sslManager;
this.standalone = standalone;
this.initialVoterSet = initialVoterSet;
this.deleteOnClose = deleteOnClose;
@@ -695,6 +714,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
return faultHandlerFactory.nonFatalFaultHandler();
}
+ public SslManager sslManager() {
+ return sslManager;
+ }
+
@Override
public void close() throws Exception {
List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
@@ -722,6 +745,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
if (jaasFile.isPresent()) {
Utils.delete(jaasFile.get());
}
+ sslManager.close();
}
} catch (Exception e) {
for (Entry<String, Future<?>> entry : futureEntries) {
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/SslManager.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/SslManager.java
new file mode 100644
index 00000000000..bbb537c360c
--- /dev/null
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/SslManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.network.ConnectionMode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestSslUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Map;
+
+public class SslManager {
+
+ private static final Logger log =
LoggerFactory.getLogger(SslManager.class);
+ private File keyStoreFile;
+ private final File trustStoreFile;
+
+ public SslManager() {
+ try {
+ trustStoreFile =
org.apache.kafka.test.TestUtils.tempFile("kafka.server.truststore", ".jks");
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create truststore file", e);
+ }
+ }
+
+ public Map<String, Object> createSslConfig() {
+ try {
+ Map<String, Object> config = new
TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
+ .createNewTrustStore(trustStoreFile)
+ .build();
+ keyStoreFile = new File((String)
config.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+ log.info("Created unified SSL config - KeyStore: {}, TrustStore:
{}", keyStoreFile.getPath(), trustStoreFile.getPath());
+ return config;
+ } catch (IOException | GeneralSecurityException e) {
+ throw new RuntimeException("Failed to create SSL config", e);
+ }
+ }
+
+ public Map<String, Object> createClientSslConfig() {
+ try {
+ return new TestSslUtils.SslConfigsBuilder(ConnectionMode.CLIENT)
+ .useExistingTrustStore(trustStoreFile)
+ .build();
+ } catch (IOException | GeneralSecurityException e) {
+ throw new RuntimeException("Failed to create client SSL config",
e);
+ }
+ }
+
+ public void close() throws IOException {
+ if (keyStoreFile != null) {
+ Utils.delete(keyStoreFile);
+ }
+ if (trustStoreFile != null) {
+ Utils.delete(trustStoreFile);
+ }
+ }
+}
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 30c597915c1..0de47d9b771 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -156,9 +156,9 @@ public class TestKitNodes {
throw new IllegalArgumentException("Invalid value for
numDisksPerBroker");
}
// TODO: remove this assertion after
https://issues.apache.org/jira/browse/KAFKA-16680 is finished
- if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT &&
brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
- (controllerSecurityProtocol != SecurityProtocol.PLAINTEXT &&
controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
- throw new IllegalArgumentException("Currently only support
PLAINTEXT / SASL_PLAINTEXT security protocol");
+ if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT &&
brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT &&
brokerSecurityProtocol != SecurityProtocol.SASL_SSL) ||
+ (controllerSecurityProtocol != SecurityProtocol.PLAINTEXT &&
controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT &&
brokerSecurityProtocol != SecurityProtocol.SASL_SSL)) {
+ throw new IllegalArgumentException("Currently only support
PLAINTEXT / SASL_PLAINTEXT / SASL_SSL security protocol");
}
if (baseDirectory == null) {
this.baseDirectory = TestUtils.tempDirectory().toPath();
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 4449145c0b5..16ed515ccd0 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -50,16 +50,6 @@ class TestUtils {
private static final long DEFAULT_POLL_INTERVAL_MS = 100;
private static final long DEFAULT_MAX_WAIT_MS = 15_000;
- /**
- * Create an empty file in the default temporary-file directory, using
`kafka` as the prefix and `tmp` as the
- * suffix to generate its name.
- */
- public static File tempFile() throws IOException {
- final File file = Files.createTempFile("kafka", ".tmp").toFile();
- file.deleteOnExit();
- return file;
- }
-
/**
* Generate a random string of letters and digits of the given length
*
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index 704d4a7a563..f1ad8080cb4 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -20,8 +20,13 @@ import kafka.server.BrokerServer;
import kafka.server.ControllerServer;
import kafka.server.KafkaBroker;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.JaasUtils;
import org.apache.kafka.common.test.KafkaClusterTestKit;
import org.apache.kafka.common.test.TestKitNodes;
import org.apache.kafka.common.test.api.ClusterConfig;
@@ -41,6 +46,7 @@ import org.junit.jupiter.api.extension.Extension;
import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
@@ -175,6 +181,27 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
.orElseThrow(() -> new RuntimeException("No controllers or
brokers!"));
}
+ @Override
+ public Map<String, Object> setClientSslConfig(Map<String, Object>
configs) {
+ Map<String, Object> props = new HashMap<>(configs);
+ if (config().brokerSecurityProtocol() ==
SecurityProtocol.SASL_SSL) {
+
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
SecurityProtocol.SASL_SSL.name);
+ props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
+ props.putIfAbsent(
+ SaslConfigs.SASL_JAAS_CONFIG,
+ String.format(
+
"org.apache.kafka.common.security.plain.PlainLoginModule required
username=\"%s\" password=\"%s\";",
+ JaasUtils.KAFKA_PLAIN_ADMIN,
JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
+ )
+ );
+ if (clusterTestKit.sslManager() != null) {
+
props.putAll(clusterTestKit.sslManager().createClientSslConfig());
+
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+ }
+ }
+ return props;
+ }
+
@Override
public Type type() {
return isCombined ? Type.CO_KRAFT : Type.KRAFT;
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
index 7b1caf3383a..4a9dcaf9e5e 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
@@ -32,11 +32,11 @@ public class TestKitNodeTest {
@ParameterizedTest
@EnumSource(SecurityProtocol.class)
public void testSecurityProtocol(SecurityProtocol securityProtocol) {
- if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol
!= SecurityProtocol.SASL_PLAINTEXT) {
- assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT
security protocol",
+ if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol
!= SecurityProtocol.SASL_PLAINTEXT && securityProtocol !=
SecurityProtocol.SASL_SSL) {
+ assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT /
SASL_SSL security protocol",
assertThrows(IllegalArgumentException.class,
() -> new
TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
- assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT
security protocol",
+ assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT /
SASL_SSL security protocol",
assertThrows(IllegalArgumentException.class,
() -> new
TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
}
diff --git
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index ac6c05e1151..e2862938f82 100644
---
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.config.ConfigResource;
@@ -77,6 +78,8 @@ import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
+import scala.jdk.javaapi.CollectionConverters;
+
import static
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
import static
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
@@ -526,6 +529,35 @@ public class ClusterTestExtensionsTest {
}
)
public void testSaslPlaintextWithController(ClusterInstance
clusterInstance) throws CancellationException, ExecutionException,
InterruptedException {
+ assertSecurityProtocol(clusterInstance,
SecurityProtocol.SASL_PLAINTEXT, "Expected broker to have SASL_PLAINTEXT
data-plane listener");
+ testSecurityProtocol(clusterInstance);
+ }
+
+ @ClusterTest(
+ types = {Type.KRAFT, Type.CO_KRAFT},
+ brokerSecurityProtocol = SecurityProtocol.SASL_SSL,
+ controllerSecurityProtocol = SecurityProtocol.SASL_SSL,
+ serverProperties = {
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+ @ClusterConfigProperty(key =
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+ }
+ )
+ public void testSaslSslWithController(ClusterInstance clusterInstance)
throws CancellationException, ExecutionException, InterruptedException {
+ assertSecurityProtocol(clusterInstance, SecurityProtocol.SASL_SSL,
"Expected broker to have SASL_SSL data-plane listener");
+ testSecurityProtocol(clusterInstance);
+ }
+
+ private static void assertSecurityProtocol(ClusterInstance
clusterInstance, SecurityProtocol saslPlaintext, String message) {
+ clusterInstance.aliveBrokers().values().forEach(broker -> {
+ List<Endpoint> endpoints =
CollectionConverters.asJava(broker.config().dataPlaneListeners());
+ assertTrue(
+ endpoints.stream().anyMatch(ep -> ep.securityProtocol() ==
saslPlaintext),
+ message
+ );
+ });
+ }
+
+ private static void testSecurityProtocol(ClusterInstance clusterInstance)
throws InterruptedException, ExecutionException {
// default ClusterInstance#admin helper with admin credentials
try (Admin admin = clusterInstance.admin(Map.of(), true)) {
admin.describeAcls(AclBindingFilter.ANY).values().get();