This is an automated email from the ASF dual-hosted git repository.

soarez pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6a1c8b465b4 KAFKA-18447 Support SASL_SSL protocol with 
java.security.auth.login.config (#20376)
6a1c8b465b4 is described below

commit 6a1c8b465b4dfda4b91897ab8b5908da6b469c91
Author: Ken Huang <[email protected]>
AuthorDate: Sat May 9 04:17:26 2026 +0800

    KAFKA-18447 Support SASL_SSL protocol with java.security.auth.login.config 
(#20376)
    
    The new test framework does not yet support the SASL_SSL security
    protocol; we should add support for it.
    
    Reviewers: Igor Soarez <[email protected]>, Chia-Ping Tsai
     <[email protected]>
---
 build.gradle                                       |  1 +
 .../apache/kafka/common/test/ClusterInstance.java  | 10 +--
 .../org/apache/kafka/common/test/JaasUtils.java    |  2 +-
 .../kafka/common/test/KafkaClusterTestKit.java     | 42 +++++++++---
 .../org/apache/kafka/common/test/SslManager.java   | 77 ++++++++++++++++++++++
 .../org/apache/kafka/common/test/TestKitNodes.java |  6 +-
 .../org/apache/kafka/common/test/TestUtils.java    | 10 ---
 .../test/junit/RaftClusterInvocationContext.java   | 27 ++++++++
 .../apache/kafka/common/test/TestKitNodeTest.java  |  6 +-
 .../test/junit/ClusterTestExtensionsTest.java      | 32 +++++++++
 10 files changed, 183 insertions(+), 30 deletions(-)

diff --git a/build.gradle b/build.gradle
index be1547d4d51..0f90b9f2298 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1666,6 +1666,7 @@ project(':test-common:test-common-runtime') {
     implementation libs.junitJupiter
     implementation libs.jacksonDataformatYaml
     implementation libs.slf4jApi
+    implementation testFixtures(project(':clients'))
 
     testImplementation libs.junitJupiter
     testImplementation libs.mockitoCore
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
index 0fda41dfc9d..f41fee0ff4f 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/ClusterInstance.java
@@ -150,7 +150,7 @@ public interface ClusterInstance {
         props.putIfAbsent(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         props.putIfAbsent(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
ByteArraySerializer.class.getName());
         props.putIfAbsent(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
-        return new KafkaProducer<>(setClientSaslConfig(props));
+        return new 
KafkaProducer<>(setClientSaslConfig(setClientSslConfig(props)));
     }
 
     default <K, V> Producer<K, V> producer() {
@@ -164,7 +164,7 @@ public interface ClusterInstance {
         props.putIfAbsent(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + 
TestUtils.randomString(5));
         props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
-        return new KafkaConsumer<>(setClientSaslConfig(props));
+        return new 
KafkaConsumer<>(setClientSaslConfig(setClientSslConfig(props)));
     }
 
     default <K, V> Consumer<K, V> consumer() {
@@ -189,7 +189,7 @@ public interface ClusterInstance {
         }
         props.putIfAbsent(ConsumerConfig.GROUP_ID_CONFIG, "group_" + 
TestUtils.randomString(5));
         props.putIfAbsent(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
-        return new KafkaShareConsumer<>(setClientSaslConfig(props), 
keyDeserializer, valueDeserializer);
+        return new 
KafkaShareConsumer<>(setClientSaslConfig(setClientSslConfig(props)), 
keyDeserializer, valueDeserializer);
     }
 
     default Admin admin(Map<String, Object> configs, boolean 
usingBootstrapControllers) {
@@ -201,7 +201,7 @@ public interface ClusterInstance {
             props.putIfAbsent(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers());
             props.remove(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG);
         }
-        return Admin.create(setClientSaslConfig(props));
+        return Admin.create(setClientSaslConfig(setClientSslConfig(props)));
     }
 
     default Map<String, Object> setClientSaslConfig(Map<String, Object> 
configs) {
@@ -220,6 +220,8 @@ public interface ClusterInstance {
         return props;
     }
 
+    Map<String, Object> setClientSslConfig(Map<String, Object> configs);
+
     default Admin admin(Map<String, Object> configs) {
         return admin(configs, false);
     }
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
index fb319d6980c..2c658c94805 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/JaasUtils.java
@@ -47,7 +47,7 @@ public class JaasUtils {
     public static final String KAFKA_PLAIN_ADMIN_PASSWORD = 
"plain-admin-secret";
 
     public static File writeJaasContextsToFile(Set<JaasSection> jaasSections) 
throws IOException {
-        File jaasFile = TestUtils.tempFile();
+        File jaasFile = org.apache.kafka.test.TestUtils.tempFile();
         try (FileOutputStream fileStream = new FileOutputStream(jaasFile);
              OutputStreamWriter writer = new OutputStreamWriter(fileStream, 
StandardCharsets.UTF_8)) {
             writer.write(String.join("", 
jaasSections.stream().map(Object::toString).toArray(String[]::new)));
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
index b152a3b1dbe..fa498e190ea 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/KafkaClusterTestKit.java
@@ -143,7 +143,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
-        private KafkaConfig createNodeConfig(TestKitNode node) throws 
IOException {
+        private KafkaConfig createNodeConfig(TestKitNode node, Map<String, 
Object> sslConfig) throws IOException {
             TestKitNode brokerNode = nodes.brokerNodes().get(node.id());
             TestKitNode controllerNode = 
nodes.controllerNodes().get(node.id());
 
@@ -151,13 +151,13 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
             props.put(KRaftConfigs.SERVER_MAX_STARTUP_TIME_MS_CONFIG,
                     Long.toString(TimeUnit.MINUTES.toMillis(10)));
             props.put(KRaftConfigs.PROCESS_ROLES_CONFIG, roles(node.id()));
-            props.put(KRaftConfigs.NODE_ID_CONFIG,
-                    Integer.toString(node.id()));
+            props.put(KRaftConfigs.NODE_ID_CONFIG, 
Integer.toString(node.id()));
+
             // In combined mode, always prefer the metadata log directory of 
the controller node.
             if (controllerNode != null) {
                 props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
                         controllerNode.metadataDirectory());
-                setSecurityProtocolProps(props, controllerSecurityProtocol);
+                setSecurityProtocolProps(props, controllerSecurityProtocol, 
sslConfig);
             } else {
                 props.put(MetadataLogConfig.METADATA_LOG_DIR_CONFIG,
                         node.metadataDirectory());
@@ -166,7 +166,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 // Set the log.dirs according to the broker node setting (if 
there is a broker node)
                 props.put(LOG_DIRS_CONFIG,
                         String.join(",", brokerNode.logDataDirectories()));
-                setSecurityProtocolProps(props, brokerSecurityProtocol);
+                setSecurityProtocolProps(props, brokerSecurityProtocol, 
sslConfig);
             } else {
                 // Set log.dirs equal to the metadata directory if there is 
just a controller.
                 props.put(LOG_DIRS_CONFIG,
@@ -228,7 +228,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return new KafkaConfig(props, false);
         }
 
-        private void setSecurityProtocolProps(Map<String, Object> props, 
String securityProtocol) {
+        private void setSecurityProtocolProps(
+            Map<String, Object> props, 
+            String securityProtocol,
+            Map<String, Object> sslConfig
+        ) {
             if (securityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) 
{
                 
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
"PLAIN");
                 
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 "PLAIN");
@@ -236,11 +240,20 @@ public class KafkaClusterTestKit implements AutoCloseable 
{
                 props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
StandardAuthorizer.class.getName());
                 
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"false");
                 props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, 
"User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
+            } else if 
(securityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
+                
props.putIfAbsent(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG, 
"PLAIN");
+                
props.putIfAbsent(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
 "PLAIN");
+                
props.putIfAbsent(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG, 
"PLAIN");
+                props.putIfAbsent(ServerConfigs.AUTHORIZER_CLASS_NAME_CONFIG, 
StandardAuthorizer.class.getName());
+                
props.putIfAbsent(StandardAuthorizer.ALLOW_EVERYONE_IF_NO_ACL_IS_FOUND_CONFIG, 
"false");
+                props.putIfAbsent(StandardAuthorizer.SUPER_USERS_CONFIG, 
"User:" + JaasUtils.KAFKA_PLAIN_ADMIN);
+                sslConfig.forEach(props::putIfAbsent);
             }
         }
 
         private Optional<File> maybeSetupJaasFile() throws Exception {
-            if 
(brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name)) {
+            if 
(brokerSecurityProtocol.equals(SecurityProtocol.SASL_PLAINTEXT.name) ||
+                    
brokerSecurityProtocol.equals(SecurityProtocol.SASL_SSL.name)) {
                 File file = JaasUtils.writeJaasContextsToFile(Set.of(
                     new 
JaasUtils.JaasSection(JaasUtils.KAFKA_SERVER_CONTEXT_NAME,
                         List.of(
@@ -272,6 +285,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
             Map<Integer, SharedServer> jointServers = new HashMap<>();
             File baseDirectory = null;
             Optional<File> jaasFile = maybeSetupJaasFile();
+            SslManager sslManager = new SslManager();
+            Map<String, Object> sslConfig = sslManager.createSslConfig();
             try {
                 baseDirectory = new File(nodes.baseDirectory());
                 for (TestKitNode node : nodes.controllerNodes().values()) {
@@ -282,7 +297,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 }
                 for (TestKitNode node : nodes.controllerNodes().values()) {
                     setupNodeDirectories(baseDirectory, 
node.metadataDirectory(), List.of());
-                    KafkaConfig config = createNodeConfig(node);
+                    KafkaConfig config = createNodeConfig(node, sslConfig);
                     SharedServer sharedServer = new SharedServer(
                         config,
                         node.initialMetaPropertiesEnsemble(),
@@ -310,7 +325,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 for (TestKitNode node : nodes.brokerNodes().values()) {
                     SharedServer sharedServer = jointServers.get(node.id());
                     if (sharedServer == null) {
-                        KafkaConfig config = createNodeConfig(node);
+                        KafkaConfig config = createNodeConfig(node, sslConfig);
                         sharedServer = new SharedServer(
                             config,
                             node.initialMetaPropertiesEnsemble(),
@@ -354,6 +369,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     faultHandlerFactory,
                     socketFactoryManager,
                     jaasFile,
+                    sslManager,
                     standalone,
                     initialVoterSet,
                     deleteOnClose);
@@ -401,6 +417,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final PreboundSocketFactoryManager socketFactoryManager;
     private final String controllerListenerName;
     private final Optional<File> jaasFile;
+    private final SslManager sslManager;
     private final boolean standalone;
     private final Optional<Map<Integer, Uuid>> initialVoterSet;
     private final boolean deleteOnClose;
@@ -413,6 +430,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
         SimpleFaultHandlerFactory faultHandlerFactory,
         PreboundSocketFactoryManager socketFactoryManager,
         Optional<File> jaasFile,
+        SslManager sslManager,
         boolean standalone,
         Optional<Map<Integer, Uuid>> initialVoterSet,
         boolean deleteOnClose
@@ -432,6 +450,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.socketFactoryManager = socketFactoryManager;
         this.controllerListenerName = nodes.controllerListenerName().value();
         this.jaasFile = jaasFile;
+        this.sslManager = sslManager;
         this.standalone = standalone;
         this.initialVoterSet = initialVoterSet;
         this.deleteOnClose = deleteOnClose;
@@ -695,6 +714,10 @@ public class KafkaClusterTestKit implements AutoCloseable {
         return faultHandlerFactory.nonFatalFaultHandler();
     }
 
+    public SslManager sslManager() {
+        return sslManager;
+    }
+    
     @Override
     public void close() throws Exception {
         List<Entry<String, Future<?>>> futureEntries = new ArrayList<>();
@@ -722,6 +745,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 if (jaasFile.isPresent()) {
                     Utils.delete(jaasFile.get());
                 }
+                sslManager.close();
             }
         } catch (Exception e) {
             for (Entry<String, Future<?>> entry : futureEntries) {
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/SslManager.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/SslManager.java
new file mode 100644
index 00000000000..bbb537c360c
--- /dev/null
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/SslManager.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.test;
+
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.network.ConnectionMode;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestSslUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.security.GeneralSecurityException;
+import java.util.Map;
+
+public class SslManager {
+
+    private static final Logger log = 
LoggerFactory.getLogger(SslManager.class);
+    private File keyStoreFile;
+    private final File trustStoreFile;
+
+    public SslManager() {
+        try {
+            trustStoreFile = 
org.apache.kafka.test.TestUtils.tempFile("kafka.server.truststore", ".jks");
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create truststore file", e);
+        }
+    }
+
+    public Map<String, Object> createSslConfig() {
+        try {
+            Map<String, Object> config = new 
TestSslUtils.SslConfigsBuilder(ConnectionMode.SERVER)
+                .createNewTrustStore(trustStoreFile)
+                .build();
+            keyStoreFile = new File((String) 
config.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG));
+            log.info("Created unified SSL config - KeyStore: {}, TrustStore: 
{}", keyStoreFile.getPath(), trustStoreFile.getPath());
+            return config;
+        } catch (IOException | GeneralSecurityException e) {
+            throw new RuntimeException("Failed to create SSL config", e);
+        }
+    }
+
+    public Map<String, Object> createClientSslConfig() {
+        try {
+            return new TestSslUtils.SslConfigsBuilder(ConnectionMode.CLIENT)
+                .useExistingTrustStore(trustStoreFile)
+                .build();
+        } catch (IOException | GeneralSecurityException e) {
+            throw new RuntimeException("Failed to create client SSL config", 
e);
+        }
+    }
+
+    public void close() throws IOException {
+        if (keyStoreFile != null) {
+            Utils.delete(keyStoreFile);
+        }
+        if (trustStoreFile != null) {
+            Utils.delete(trustStoreFile);
+        }
+    }
+}
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
index 30c597915c1..0de47d9b771 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestKitNodes.java
@@ -156,9 +156,9 @@ public class TestKitNodes {
                 throw new IllegalArgumentException("Invalid value for 
numDisksPerBroker");
             }
             // TODO: remove this assertion after 
https://issues.apache.org/jira/browse/KAFKA-16680 is finished
-            if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && 
brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT) ||
-                (controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && 
controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT)) {
-                throw new IllegalArgumentException("Currently only support 
PLAINTEXT / SASL_PLAINTEXT security protocol");
+            if ((brokerSecurityProtocol != SecurityProtocol.PLAINTEXT && 
brokerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT && 
brokerSecurityProtocol != SecurityProtocol.SASL_SSL) ||
+                (controllerSecurityProtocol != SecurityProtocol.PLAINTEXT && 
controllerSecurityProtocol != SecurityProtocol.SASL_PLAINTEXT && 
brokerSecurityProtocol != SecurityProtocol.SASL_SSL)) {
+                throw new IllegalArgumentException("Currently only support 
PLAINTEXT / SASL_PLAINTEXT / SASL_SSL security protocol");
             }
             if (baseDirectory == null) {
                 this.baseDirectory = TestUtils.tempDirectory().toPath();
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 4449145c0b5..16ed515ccd0 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -50,16 +50,6 @@ class TestUtils {
     private static final long DEFAULT_POLL_INTERVAL_MS = 100;
     private static final long DEFAULT_MAX_WAIT_MS = 15_000;
 
-    /**
-     * Create an empty file in the default temporary-file directory, using 
`kafka` as the prefix and `tmp` as the
-     * suffix to generate its name.
-     */
-    public static File tempFile() throws IOException {
-        final File file = Files.createTempFile("kafka", ".tmp").toFile();
-        file.deleteOnExit();
-        return file;
-    }
-
     /**
      * Generate a random string of letters and digits of the given length
      *
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
index 704d4a7a563..f1ad8080cb4 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/junit/RaftClusterInvocationContext.java
@@ -20,8 +20,13 @@ import kafka.server.BrokerServer;
 import kafka.server.ControllerServer;
 import kafka.server.KafkaBroker;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.test.ClusterInstance;
+import org.apache.kafka.common.test.JaasUtils;
 import org.apache.kafka.common.test.KafkaClusterTestKit;
 import org.apache.kafka.common.test.TestKitNodes;
 import org.apache.kafka.common.test.api.ClusterConfig;
@@ -41,6 +46,7 @@ import org.junit.jupiter.api.extension.Extension;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -175,6 +181,27 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
                 .orElseThrow(() -> new RuntimeException("No controllers or 
brokers!"));
         }
 
+        @Override
+        public Map<String, Object> setClientSslConfig(Map<String, Object> 
configs) {
+            Map<String, Object> props = new HashMap<>(configs);
+            if (config().brokerSecurityProtocol() == 
SecurityProtocol.SASL_SSL) {
+                
props.putIfAbsent(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
SecurityProtocol.SASL_SSL.name);
+                props.putIfAbsent(SaslConfigs.SASL_MECHANISM, "PLAIN");
+                props.putIfAbsent(
+                    SaslConfigs.SASL_JAAS_CONFIG,
+                    String.format(
+                        
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=\"%s\" password=\"%s\";",
+                        JaasUtils.KAFKA_PLAIN_ADMIN, 
JaasUtils.KAFKA_PLAIN_ADMIN_PASSWORD
+                    )
+                );
+                if (clusterTestKit.sslManager() != null) {
+                    
props.putAll(clusterTestKit.sslManager().createClientSslConfig());
+                    
props.put(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, "");
+                }
+            }
+            return props;
+        }
+
         @Override
         public Type type() {
             return isCombined ? Type.CO_KRAFT : Type.KRAFT;
diff --git 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
index 7b1caf3383a..4a9dcaf9e5e 100644
--- 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
+++ 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/TestKitNodeTest.java
@@ -32,11 +32,11 @@ public class TestKitNodeTest {
     @ParameterizedTest
     @EnumSource(SecurityProtocol.class)
     public void testSecurityProtocol(SecurityProtocol securityProtocol) {
-        if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol 
!= SecurityProtocol.SASL_PLAINTEXT) {
-            assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT 
security protocol",
+        if (securityProtocol != SecurityProtocol.PLAINTEXT && securityProtocol 
!= SecurityProtocol.SASL_PLAINTEXT && securityProtocol != 
SecurityProtocol.SASL_SSL) {
+            assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT / 
SASL_SSL security protocol",
                 assertThrows(IllegalArgumentException.class,
                     () -> new 
TestKitNodes.Builder().setBrokerSecurityProtocol(securityProtocol).build()).getMessage());
-            assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT 
security protocol",
+            assertEquals("Currently only support PLAINTEXT / SASL_PLAINTEXT / 
SASL_SSL security protocol",
                 assertThrows(IllegalArgumentException.class,
                     () -> new 
TestKitNodes.Builder().setControllerSecurityProtocol(securityProtocol).build()).getMessage());
         }
diff --git 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
index ac6c05e1151..e2862938f82 100644
--- 
a/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
+++ 
b/test-common/test-common-runtime/src/test/java/org/apache/kafka/common/test/junit/ClusterTestExtensionsTest.java
@@ -33,6 +33,7 @@ import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.config.ConfigResource;
@@ -77,6 +78,8 @@ import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import scala.jdk.javaapi.CollectionConverters;
+
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
 import static org.apache.kafka.clients.consumer.GroupProtocol.CLASSIC;
@@ -526,6 +529,35 @@ public class ClusterTestExtensionsTest {
         }
     )
     public void testSaslPlaintextWithController(ClusterInstance 
clusterInstance) throws CancellationException, ExecutionException, 
InterruptedException {
+        assertSecurityProtocol(clusterInstance, 
SecurityProtocol.SASL_PLAINTEXT, "Expected broker to have SASL_PLAINTEXT 
data-plane listener");
+        testSecurityProtocol(clusterInstance);
+    }
+
+    @ClusterTest(
+        types = {Type.KRAFT, Type.CO_KRAFT},
+        brokerSecurityProtocol = SecurityProtocol.SASL_SSL,
+        controllerSecurityProtocol = SecurityProtocol.SASL_SSL,
+        serverProperties = {
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG, value = "1"),
+            @ClusterConfigProperty(key = 
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, value = "1")
+        }
+    )
+    public void testSaslSslWithController(ClusterInstance clusterInstance) 
throws CancellationException, ExecutionException, InterruptedException {
+        assertSecurityProtocol(clusterInstance, SecurityProtocol.SASL_SSL, 
"Expected broker to have SASL_SSL data-plane listener");
+        testSecurityProtocol(clusterInstance);
+    }
+
+    private static void assertSecurityProtocol(ClusterInstance 
clusterInstance, SecurityProtocol saslPlaintext, String message) {
+        clusterInstance.aliveBrokers().values().forEach(broker -> {
+            List<Endpoint> endpoints = 
CollectionConverters.asJava(broker.config().dataPlaneListeners());
+            assertTrue(
+                    endpoints.stream().anyMatch(ep -> ep.securityProtocol() == 
saslPlaintext),
+                    message
+            );
+        });
+    }
+
+    private static void testSecurityProtocol(ClusterInstance clusterInstance) 
throws InterruptedException, ExecutionException {
         // default ClusterInstance#admin helper with admin credentials
         try (Admin admin = clusterInstance.admin(Map.of(), true)) {
             admin.describeAcls(AclBindingFilter.ANY).values().get();

Reply via email to