This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b4eb7cf390e KAFKA-16683 Extract security-related helpers from 
scala.TestUtils to java class (#16912)
b4eb7cf390e is described below

commit b4eb7cf390e77584c8a0c08955f496552e6725a8
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 03:02:11 2024 +0800

    KAFKA-16683 Extract security-related helpers from scala.TestUtils to java 
class (#16912)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../test/java/kafka/security/JaasTestUtils.java    | 108 +++++++++++++++++++++
 .../kafka/api/BaseProducerSendTest.scala           |  10 +-
 .../api/DescribeAuthorizedOperationsTest.scala     |   3 +-
 .../kafka/api/IntegrationTestHarness.scala         |   6 +-
 .../SaslClientsWithInvalidCredentialsTest.scala    |   4 +-
 .../SaslPlainSslEndToEndAuthorizationTest.scala    |  11 ++-
 .../scala/integration/kafka/api/SaslSetup.scala    |   3 +-
 .../kafka/api/SslAdminIntegrationTest.scala        |  11 ++-
 .../kafka/api/SslEndToEndAuthorizationTest.scala   |  15 +--
 .../server/DynamicBrokerReconfigurationTest.scala  |   6 +-
 .../kafka/server/IntegrationTestUtils.scala        |  10 +-
 ...ListenersWithSameSecurityProtocolBaseTest.scala |   8 +-
 .../DelegationTokenRequestsOnPlainTextTest.scala   |   7 +-
 .../kafka/server/DelegationTokenRequestsTest.scala |   3 +-
 ...nTokenRequestsWithDisableTokenFeatureTest.scala |   4 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  89 ++---------------
 16 files changed, 175 insertions(+), 123 deletions(-)

diff --git a/core/src/test/java/kafka/security/JaasTestUtils.java 
b/core/src/test/java/kafka/security/JaasTestUtils.java
index e80208b77d2..73e8a7245bc 100644
--- a/core/src/test/java/kafka/security/JaasTestUtils.java
+++ b/core/src/test/java/kafka/security/JaasTestUtils.java
@@ -18,9 +18,13 @@ package kafka.security;
 
 import kafka.utils.TestUtils;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.ScramMechanism;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.ConnectionMode;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.test.TestSslUtils;
 
 import java.io.BufferedWriter;
 import java.io.File;
@@ -103,6 +107,8 @@ public class JaasTestUtils {
 
     public static final String SERVICE_NAME = "kafka";
 
+    public static final String SSL_CERTIFICATE_CN = "localhost";
+
     public static Properties saslConfigs(Optional<Properties> saslProperties) {
         Properties result = saslProperties.orElse(new Properties());
         if (IS_IBM_SECURITY && 
!result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) {
@@ -269,4 +275,106 @@ public class JaasTestUtils {
             writer.write(String.join("", 
jaasSections.stream().map(Object::toString).toArray(String[]::new)));
         }
     }
+
+    public static boolean usesSslTransportLayer(SecurityProtocol 
securityProtocol) {
+        switch (securityProtocol) {
+            case SSL:
+            case SASL_SSL:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    public static boolean usesSaslAuthentication(SecurityProtocol 
securityProtocol) {
+        switch (securityProtocol) {
+            case SASL_PLAINTEXT:
+            case SASL_SSL:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    public static Properties sslConfigs(ConnectionMode mode,
+                                        boolean clientCert,
+                                        Optional<File> trustStoreFile,
+                                        String certAlias) throws Exception {
+        return sslConfigs(mode, clientCert, trustStoreFile, certAlias, 
SSL_CERTIFICATE_CN, TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS);
+    }
+
+    public static Properties sslConfigs(ConnectionMode mode,
+                                        boolean clientCert,
+                                        Optional<File> trustStoreFile,
+                                        String certAlias,
+                                        String certCn,
+                                        String tlsProtocol) throws Exception {
+        File trustStore = trustStoreFile.orElseThrow(() -> new Exception("SSL 
enabled but no trustStoreFile provided"));
+        Properties sslProps = new Properties();
+        sslProps.putAll(new TestSslUtils.SslConfigsBuilder(mode)
+                .useClientCert(clientCert)
+                .createNewTrustStore(trustStore)
+                .certAlias(certAlias)
+                .cn(certCn)
+                .tlsProtocol(tlsProtocol)
+                .build());
+        return sslProps;
+    }
+
+    public static Properties producerSecurityConfigs(SecurityProtocol 
securityProtocol,
+                                                     Optional<File> 
trustStoreFile,
+                                                     Optional<Properties> 
saslProperties) throws Exception {
+        return securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
trustStoreFile, "producer", SSL_CERTIFICATE_CN, saslProperties);
+    }
+
+    public static Properties consumerSecurityConfigs(SecurityProtocol 
securityProtocol, Optional<File> trustStoreFile, Optional<Properties> 
saslProperties) throws Exception {
+        return securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
trustStoreFile, "consumer", SSL_CERTIFICATE_CN, saslProperties);
+    }
+
+    public static Properties adminClientSecurityConfigs(SecurityProtocol 
securityProtocol, Optional<File> trustStoreFile, Optional<Properties> 
saslProperties) throws Exception {
+        return securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
trustStoreFile, "admin-client", SSL_CERTIFICATE_CN, saslProperties);
+    }
+
+    public static Properties securityConfigs(ConnectionMode connectionMode,
+                                             SecurityProtocol securityProtocol,
+                                             Optional<File> trustStoreFile,
+                                             String certAlias,
+                                             String certCn,
+                                             Optional<Properties> 
saslProperties) throws Exception {
+        return securityConfigs(connectionMode, securityProtocol, 
trustStoreFile, certAlias, certCn, saslProperties,
+                TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS, Optional.empty());
+    }
+    /**
+     * Returns security configuration options for broker or clients
+     *
+     * @param connectionMode Client or server mode
+     * @param securityProtocol Security protocol which indicates if SASL or 
SSL or both configs are included
+     * @param trustStoreFile Trust store file must be provided for SSL and 
SASL_SSL
+     * @param certAlias Alias of certificate in SSL key store
+     * @param certCn CN for certificate
+     * @param saslProperties SASL configs if security protocol is SASL_SSL or 
SASL_PLAINTEXT
+     * @param tlsProtocol TLS version
+     * @param needsClientCert If not empty, a flag which indicates if client 
certificates are required. By default,
+     *                        client certificates are generated only if 
securityProtocol is SSL (not for SASL_SSL).
+     */
+    public static Properties securityConfigs(ConnectionMode connectionMode,
+                                             SecurityProtocol securityProtocol,
+                                             Optional<File> trustStoreFile,
+                                             String certAlias,
+                                             String certCn,
+                                             Optional<Properties> 
saslProperties,
+                                             String tlsProtocol,
+                                             Optional<Boolean> 
needsClientCert) throws Exception {
+        Properties props = new Properties();
+        if (usesSslTransportLayer(securityProtocol)) {
+            boolean addClientCert = needsClientCert.orElse(securityProtocol == 
SecurityProtocol.SSL);
+            props.putAll(sslConfigs(connectionMode, addClientCert, 
trustStoreFile, certAlias, certCn, tlsProtocol));
+        }
+
+        if (usesSaslAuthentication(securityProtocol)) {
+            props.putAll(saslConfigs(saslProperties));
+        }
+        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.name());
+        return props;
+    }
 }
diff --git 
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 6e8341d7358..2bf6c722cb5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
 import java.util.{Collections, Properties}
 import java.util.concurrent.TimeUnit
 import kafka.integration.KafkaServerTestHarness
+import kafka.security.JaasTestUtils
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, NewPartitions}
@@ -29,7 +30,7 @@ import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
+import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -40,6 +41,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.mutable
+import scala.compat.java8.OptionConverters
 import scala.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
@@ -70,12 +72,12 @@ abstract class BaseProducerSendTest extends 
KafkaServerTestHarness {
     super.setUp(testInfo)
 
     admin = TestUtils.createAdminClient(brokers, listenerName,
-        TestUtils.securityConfigs(ConnectionMode.CLIENT,
+        JaasTestUtils.securityConfigs(ConnectionMode.CLIENT,
           securityProtocol,
-          trustStoreFile,
+          OptionConverters.toJava(trustStoreFile),
           "adminClient",
           TestUtils.SslCertificateCn,
-          clientSaslProperties))
+          OptionConverters.toJava(clientSaslProperties)))
 
     consumer = TestUtils.createConsumer(
       bootstrapServers(listenerName = 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
diff --git 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index a9c54039750..7a778a12e03 100644
--- 
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.server.config.{ServerConfigs, 
ZkConfigs}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
 
+import scala.compat.java8.OptionConverters
 import scala.jdk.CollectionConverters._
 
 object DescribeAuthorizedOperationsTest {
@@ -126,7 +127,7 @@ class DescribeAuthorizedOperationsTest extends 
IntegrationTestHarness with SaslS
     adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 
bootstrapServers())
     adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
     val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+      JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     adminClientConfig.putAll(securityProps)
     adminClientConfig
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 3d58441c8d4..8f6271c8e5c 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -26,6 +26,7 @@ import java.util.Properties
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
 import kafka.server.KafkaConfig
 import kafka.integration.KafkaServerTestHarness
+import kafka.security.JaasTestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer, Deserializer, Serializer}
@@ -36,6 +37,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 
 import scala.collection.mutable
 import scala.collection.Seq
+import scala.compat.java8.OptionConverters
 
 /**
  * A helper class for writing integration tests that involve producers, 
consumers, and servers
@@ -170,8 +172,8 @@ abstract class IntegrationTestHarness extends 
KafkaServerTestHarness {
   }
 
   def clientSecurityProps(certAlias: String): Properties = {
-    TestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
trustStoreFile, certAlias, TestUtils.SslCertificateCn,
-      clientSaslProperties)
+    JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
OptionConverters.toJava(trustStoreFile), certAlias,
+      JaasTestUtils.SSL_CERTIFICATE_CN, 
OptionConverters.toJava(clientSaslProperties))
   }
 
   def superuserSecurityProps(certAlias: String): Properties = {
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 7190c939b2f..d99c8d41640 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -32,6 +32,8 @@ import 
org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import scala.compat.java8.OptionConverters
+
 class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
   private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
   private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
@@ -141,7 +143,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
 
   @Test
   def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
-    val props = TestUtils.adminClientSecurityConfigs(securityProtocol, 
trustStoreFile, clientSaslProperties)
+    val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val adminClient = Admin.create(props)
 
diff --git 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index aa85d9eb531..5f1226adcea 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -16,7 +16,7 @@
   */
 package kafka.api
 
-import kafka.security.JaasModule
+import kafka.security.{JaasModule, JaasTestUtils}
 import kafka.security.JaasTestUtils._
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils.isAclSecure
@@ -27,15 +27,17 @@ import org.apache.kafka.common.network.ConnectionMode
 import org.apache.kafka.common.security.auth._
 import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
 import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
+import org.apache.kafka.test.TestSslUtils
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
 import org.junit.jupiter.api.Test
 
 import java.security.AccessController
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import javax.security.auth.Subject
 import javax.security.auth.callback._
 import javax.security.auth.login.AppConfigurationEntry
 import scala.collection.Seq
+import scala.compat.java8.OptionConverters
 import scala.jdk.CollectionConverters._
 
 object SaslPlainSslEndToEndAuthorizationTest {
@@ -141,8 +143,9 @@ class SaslPlainSslEndToEndAuthorizationTest extends 
SaslEndToEndAuthorizationTes
   // Generate SSL certificates for clients since we are enabling TLS mutual 
authentication
   // in this test for the SASL_SSL listener.
   override def clientSecurityProps(certAlias: String): Properties = {
-    TestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
trustStoreFile, certAlias, TestUtils.SslCertificateCn,
-      clientSaslProperties, needsClientCert = Some(true))
+    JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
OptionConverters.toJava(trustStoreFile),
+      certAlias, JaasTestUtils.SSL_CERTIFICATE_CN, 
OptionConverters.toJava(clientSaslProperties),
+      TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS, Optional.of(true))
   }
 
   /**
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala 
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 1a4e6a5425a..25cd86cab53 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -39,6 +39,7 @@ import java.util
 import java.util.Properties
 import javax.security.auth.login.Configuration
 import scala.collection.Seq
+import scala.compat.java8.OptionConverters
 import scala.jdk.CollectionConverters._
 import scala.jdk.OptionConverters._
 import scala.util.Using
@@ -176,7 +177,7 @@ trait SaslSetup {
     val config = new util.HashMap[String, Object]
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+      JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     securityProps.forEach { (key, value) => 
config.put(key.asInstanceOf[String], value) }
     config.put(SaslConfigs.SASL_JAAS_CONFIG, 
jaasScramClientLoginModule(scramMechanism, user, password))
     Admin.create(config)
diff --git 
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index fdaafce2659..bc6bd972f3a 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -15,8 +15,8 @@ package kafka.api
 import java.util
 import java.util.concurrent._
 import java.util.Properties
-
 import com.yammer.metrics.core.Gauge
+import kafka.security.JaasTestUtils
 import kafka.security.authorizer.AclAuthorizer
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.CreateAclsResult
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
+import scala.compat.java8.OptionConverters
 
 object SslAdminIntegrationTest {
   @volatile var semaphore: Option[Semaphore] = None
@@ -284,16 +285,16 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
 
   // Override the CN to create a principal based on it
   override def superuserSecurityProps(certAlias: String): Properties = {
-    val props = TestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, trustStoreFile, certAlias, 
SslAdminIntegrationTest.superuserCn,
-      clientSaslProperties)
+    val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, OptionConverters.toJava(trustStoreFile),
+      certAlias, SslAdminIntegrationTest.superuserCn, 
OptionConverters.toJava(clientSaslProperties))
     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
     props
   }
 
   // Override the CN to create a principal based on it
   override def clientSecurityProps(certAlias: String): Properties = {
-    val props = TestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, trustStoreFile, certAlias, SslAdminIntegrationTest.clientCn,
-      clientSaslProperties)
+    val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, OptionConverters.toJava(trustStoreFile),
+      certAlias, SslAdminIntegrationTest.clientCn, 
OptionConverters.toJava(clientSaslProperties))
     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
     props
   }
diff --git 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 5c5c24d36cc..5fc8295626e 100644
--- 
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -17,9 +17,9 @@
 
 package kafka.api
 
-import java.util.Properties
+import kafka.security.JaasTestUtils
 
-import kafka.utils.TestUtils
+import java.util.Properties
 import org.apache.kafka.common.config.SslConfigs
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.network.ConnectionMode
@@ -28,6 +28,9 @@ import 
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
 import org.apache.kafka.common.utils.Java
 import org.junit.jupiter.api.{BeforeEach, TestInfo}
 
+import java.util.Optional
+import scala.compat.java8.OptionConverters
+
 object SslEndToEndAuthorizationTest {
   val superuserCn = "super-user"
 
@@ -78,16 +81,16 @@ class SslEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest {
   }
 
   override def clientSecurityProps(certAlias: String): Properties = {
-    val props = TestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, trustStoreFile,
-      certAlias, clientCn, clientSaslProperties, tlsProtocol)
+    val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, OptionConverters.toJava(trustStoreFile),
+      certAlias, clientCn, OptionConverters.toJava(clientSaslProperties), 
tlsProtocol, Optional.empty())
     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
     props
   }
   // This test doesn't really care about matching the SSL certificate to a 
particular principal
   // We can override the CN and create a principal based on it or on the 
server SSL
   override def superuserSecurityProps(certAlias: String): Properties = {
-    val props = TestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, trustStoreFile,
-      certAlias, superuserCn, clientSaslProperties, tlsProtocol)
+    val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, 
securityProtocol, OptionConverters.toJava(trustStoreFile),
+      certAlias, superuserCn, OptionConverters.toJava(clientSaslProperties), 
tlsProtocol, Optional.empty())
     props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
     props
   }
diff --git 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 6549bc925fd..93418cf99f4 100644
--- 
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -24,7 +24,7 @@ import java.lang.management.ManagementFactory
 import java.security.KeyStore
 import java.time.Duration
 import java.util
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
 import java.util.concurrent._
 import javax.management.ObjectName
 import com.yammer.metrics.core.MetricName
@@ -107,8 +107,8 @@ class DynamicBrokerReconfigurationTest extends 
QuorumTestHarness with SaslSetup
 
   private val trustStoreFile1 = TestUtils.tempFile("truststore", ".jks")
   private val trustStoreFile2 = TestUtils.tempFile("truststore", ".jks")
-  private val sslProperties1 = TestUtils.sslConfigs(ConnectionMode.SERVER, 
clientCert = false, Some(trustStoreFile1), "kafka")
-  private val sslProperties2 = TestUtils.sslConfigs(ConnectionMode.SERVER, 
clientCert = false, Some(trustStoreFile2), "kafka")
+  private val sslProperties1 = JaasTestUtils.sslConfigs(ConnectionMode.SERVER, 
false, Optional.of(trustStoreFile1), "kafka")
+  private val sslProperties2 = JaasTestUtils.sslConfigs(ConnectionMode.SERVER, 
false, Optional.of(trustStoreFile2), "kafka")
   private val invalidSslProperties = invalidSslConfigs
 
   @BeforeEach
diff --git 
a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala 
b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 0623a16b388..7e357eb6e2b 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -20,13 +20,12 @@ package kafka.server
 import java.io.{DataInputStream, DataOutputStream}
 import java.net.Socket
 import java.nio.ByteBuffer
-import java.util.{Collections, Properties}
-
+import java.util.{Collections, Optional, Properties}
 import kafka.network.SocketServer
+import kafka.security.JaasTestUtils
 import kafka.utils.Implicits._
-import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, NewTopic}
-import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
+import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
 import org.apache.kafka.common.protocol.ApiKeys
 import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse, 
RequestHeader, ResponseHeader}
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -139,6 +138,7 @@ object IntegrationTestUtils {
   }
 
   def clientSecurityProps(certAlias: String): Properties = {
-    TestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, None, 
certAlias, TestUtils.SslCertificateCn, None) // TODO use real trust store and 
client SASL properties
+    JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, 
Optional.empty(), certAlias,
+      JaasTestUtils.SSL_CERTIFICATE_CN, Optional.empty()) // TODO use real 
trust store and client SASL properties
   }
 }
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 93dcce79826..28ac8b31237 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -18,7 +18,7 @@
 
 package kafka.server
 
-import java.util.{Collections, Objects, Properties}
+import java.util.{Collections, Objects, Optional, Properties}
 import java.util.concurrent.TimeUnit
 import kafka.api.SaslSetup
 import kafka.security.JaasTestUtils
@@ -92,7 +92,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
       props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
       props ++= dynamicJaasSections
 
-      props ++= TestUtils.sslConfigs(ConnectionMode.SERVER, clientCert = 
false, Some(trustStoreFile), s"server$brokerId")
+      props ++= JaasTestUtils.sslConfigs(ConnectionMode.SERVER, false, 
Optional.of(trustStoreFile), s"server$brokerId")
 
       // set listener-specific configs and set an invalid path for the global 
config to verify that the overrides work
       Seq(SecureInternal, SecureExternal).foreach { listenerName =>
@@ -120,7 +120,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
       val listenerName = endPoint.listenerName
 
       val trustStoreFile =
-        if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol)) 
Some(this.trustStoreFile)
+        if (JaasTestUtils.usesSslTransportLayer(endPoint.securityProtocol)) 
Some(this.trustStoreFile)
         else None
 
       val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
@@ -138,7 +138,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
           securityProtocol = endPoint.securityProtocol, trustStoreFile = 
trustStoreFile, saslProperties = saslProps)
       }
 
-      if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
+      if (JaasTestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
         kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach { 
mechanism =>
           addProducerConsumer(listenerName, mechanism, 
Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
         }
diff --git 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index a64792f8408..ca1d32e91b9 100644
--- 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -16,9 +16,9 @@
   */
 package kafka.server
 
-import java.util
+import kafka.security.JaasTestUtils
 
-import kafka.utils.TestUtils
+import java.util
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
 import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions.assertThrows
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
+import scala.compat.java8.OptionConverters
 import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
@@ -42,7 +43,7 @@ class DelegationTokenRequestsOnPlainTextTest extends 
BaseRequestTest {
     val config = new util.HashMap[String, Object]
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+      JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     securityProps.forEach { (key, value) => 
config.put(key.asInstanceOf[String], value) }
     config
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index bf134c5a4bc..18f7b3f538e 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -30,6 +30,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import java.util
+import scala.compat.java8.OptionConverters
 import scala.concurrent.ExecutionException
 import scala.jdk.CollectionConverters._
 
@@ -59,7 +60,7 @@ class DelegationTokenRequestsTest extends 
IntegrationTestHarness with SaslSetup
     val config = new util.HashMap[String, Object]
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+      JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     securityProps.forEach { (key, value) => 
config.put(key.asInstanceOf[String], value) }
     config
   }
diff --git 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index ff662159178..a28d784c5cb 100644
--- 
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -18,7 +18,6 @@ package kafka.server
 
 import kafka.api.{KafkaSasl, SaslSetup}
 import kafka.security.JaasTestUtils
-import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
 import org.apache.kafka.common.errors.DelegationTokenDisabledException
 import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -28,6 +27,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
 import java.util
+import scala.compat.java8.OptionConverters
 import scala.concurrent.ExecutionException
 
 class DelegationTokenRequestsWithDisableTokenFeatureTest extends 
BaseRequestTest with SaslSetup {
@@ -50,7 +50,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest 
extends BaseRequestTest
     val config = new util.HashMap[String, Object]
     config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val securityProps: util.Map[Object, Object] =
-      TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile, 
clientSaslProperties)
+      JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     securityProps.forEach { (key, value) => 
config.put(key.asInstanceOf[String], value) }
     config
   }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f503caf43c4..a833258836a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer._
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
-import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs}
+import org.apache.kafka.clients.ClientResponse
 import org.apache.kafka.common._
 import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBindingFilter}
 import org.apache.kafka.common.compress.Compression
@@ -67,7 +67,7 @@ import org.apache.kafka.server.{ClientMetricsManager, 
ControllerRequestCompletio
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
 import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig, 
LogDirFailureChannel, ProducerStateManagerConfig}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
 import org.apache.zookeeper.KeeperException.SessionExpiredException
 import org.apache.zookeeper.ZooDefs._
 import org.apache.zookeeper.data.ACL
@@ -88,6 +88,7 @@ import java.util.{Arrays, Collections, Optional, Properties}
 import scala.annotation.nowarn
 import scala.collection.mutable.ArrayBuffer
 import scala.collection.{Map, Seq, mutable}
+import scala.compat.java8.OptionConverters
 import scala.concurrent.duration.FiniteDuration
 import scala.concurrent.{Await, ExecutionContext, Future}
 import scala.jdk.CollectionConverters._
@@ -360,10 +361,10 @@ object TestUtils extends Logging {
     props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2")
     props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2")
 
-    if (protocolAndPorts.exists { case (protocol, _) => 
usesSslTransportLayer(protocol) })
-      props ++= sslConfigs(ConnectionMode.SERVER, false, trustStoreFile, 
s"server$nodeId")
+    if (protocolAndPorts.exists { case (protocol, _) => 
JaasTestUtils.usesSslTransportLayer(protocol) })
+      props ++= JaasTestUtils.sslConfigs(ConnectionMode.SERVER, false, 
OptionConverters.toJava(trustStoreFile), s"server$nodeId")
 
-    if (protocolAndPorts.exists { case (protocol, _) => 
usesSaslAuthentication(protocol) })
+    if (protocolAndPorts.exists { case (protocol, _) => 
JaasTestUtils.usesSaslAuthentication(protocol) })
       props ++= JaasTestUtils.saslConfigs(saslProperties.toJava)
 
     interBrokerSecurityProtocol.foreach { protocol =>
@@ -646,44 +647,6 @@ object TestUtils extends Logging {
    */
   def randomBytes(numBytes: Int): Array[Byte] = 
JTestUtils.randomBytes(numBytes)
 
-  /**
-   * Returns security configuration options for broker or clients
-   *
-   * @param connectionMode Client or server mode
-   * @param securityProtocol Security protocol which indicates if SASL or SSL 
or both configs are included
-   * @param trustStoreFile Trust store file must be provided for SSL and 
SASL_SSL
-   * @param certAlias Alias of certificate in SSL key store
-   * @param certCn CN for certificate
-   * @param saslProperties SASL configs if security protocol is SASL_SSL or 
SASL_PLAINTEXT
-   * @param tlsProtocol TLS version
-   * @param needsClientCert If not empty, a flag which indicates if client 
certificates are required. By default
-   *                        client certificates are generated only if 
securityProtocol is SSL (not for SASL_SSL).
-   */
-  def securityConfigs(connectionMode: ConnectionMode,
-                      securityProtocol: SecurityProtocol,
-                      trustStoreFile: Option[File],
-                      certAlias: String,
-                      certCn: String,
-                      saslProperties: Option[Properties],
-                      tlsProtocol: String = 
TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS,
-                      needsClientCert: Option[Boolean] = None): Properties = {
-    val props = new Properties
-    if (usesSslTransportLayer(securityProtocol)) {
-      val addClientCert = needsClientCert.getOrElse(securityProtocol == 
SecurityProtocol.SSL)
-      props ++= sslConfigs(connectionMode, addClientCert, trustStoreFile, 
certAlias, certCn, tlsProtocol)
-    }
-
-    if (usesSaslAuthentication(securityProtocol))
-      props ++= JaasTestUtils.saslConfigs(saslProperties.toJava)
-    props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, 
securityProtocol.name)
-    props
-  }
-
-  def producerSecurityConfigs(securityProtocol: SecurityProtocol,
-                              trustStoreFile: Option[File],
-                              saslProperties: Option[Properties]): Properties =
-    securityConfigs(ConnectionMode.CLIENT, securityProtocol, trustStoreFile, 
"producer", SslCertificateCn, saslProperties)
-
   /**
    * Create a (new) producer with a few pre-configured properties.
    */
@@ -715,26 +678,10 @@ object TestUtils extends Logging {
     producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
     producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
     producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, 
enableIdempotence.toString)
-    producerProps ++= producerSecurityConfigs(securityProtocol, 
trustStoreFile, saslProperties)
+    producerProps ++= JaasTestUtils.producerSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(saslProperties))
     new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
   }
 
-  def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean = 
securityProtocol match {
-    case SecurityProtocol.SSL | SecurityProtocol.SASL_SSL => true
-    case _ => false
-  }
-
-  def usesSaslAuthentication(securityProtocol: SecurityProtocol): Boolean = 
securityProtocol match {
-    case SecurityProtocol.SASL_PLAINTEXT | SecurityProtocol.SASL_SSL => true
-    case _ => false
-  }
-
-  def consumerSecurityConfigs(securityProtocol: SecurityProtocol, 
trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
-    securityConfigs(ConnectionMode.CLIENT, securityProtocol, trustStoreFile, 
"consumer", SslCertificateCn, saslProperties)
-
-  def adminClientSecurityConfigs(securityProtocol: SecurityProtocol, 
trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
-    securityConfigs(ConnectionMode.CLIENT, securityProtocol, trustStoreFile, 
"admin-client", SslCertificateCn, saslProperties)
-
   /**
    * Create a consumer with a few pre-configured properties.
    */
@@ -758,7 +705,7 @@ object TestUtils extends Logging {
     consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
enableAutoCommit.toString)
     consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 
maxPollRecords.toString)
     consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, if 
(readCommitted) "read_committed" else "read_uncommitted")
-    consumerProps ++= consumerSecurityConfigs(securityProtocol, 
trustStoreFile, saslProperties)
+    consumerProps ++= JaasTestUtils.consumerSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(saslProperties))
     new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
   }
 
@@ -1388,26 +1335,6 @@ object TestUtils extends Logging {
     new String(bytes, encoding)
   }
 
-  def sslConfigs(mode: ConnectionMode, clientCert: Boolean, trustStoreFile: 
Option[File], certAlias: String,
-                 certCn: String = SslCertificateCn,
-                 tlsProtocol: String = 
TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS): Properties = {
-    val trustStore = trustStoreFile.getOrElse {
-      throw new Exception("SSL enabled but no trustStoreFile provided")
-    }
-
-    val sslConfigs = new TestSslUtils.SslConfigsBuilder(mode)
-      .useClientCert(clientCert)
-      .createNewTrustStore(trustStore)
-      .certAlias(certAlias)
-      .cn(certCn)
-      .tlsProtocol(tlsProtocol)
-      .build()
-
-    val sslProps = new Properties()
-    sslConfigs.forEach { (k, v) => sslProps.put(k, v) }
-    sslProps
-  }
-
   def waitAndVerifyAcls(expected: Set[AccessControlEntry],
                         authorizer: JAuthorizer,
                         resource: ResourcePattern,


Reply via email to