This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b4eb7cf390e KAFKA-16683 Extract security-related helpers from
scala.TestUtils to java class (#16912)
b4eb7cf390e is described below
commit b4eb7cf390e77584c8a0c08955f496552e6725a8
Author: PoAn Yang <[email protected]>
AuthorDate: Fri Sep 27 03:02:11 2024 +0800
KAFKA-16683 Extract security-related helpers from scala.TestUtils to java
class (#16912)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../test/java/kafka/security/JaasTestUtils.java | 108 +++++++++++++++++++++
.../kafka/api/BaseProducerSendTest.scala | 10 +-
.../api/DescribeAuthorizedOperationsTest.scala | 3 +-
.../kafka/api/IntegrationTestHarness.scala | 6 +-
.../SaslClientsWithInvalidCredentialsTest.scala | 4 +-
.../SaslPlainSslEndToEndAuthorizationTest.scala | 11 ++-
.../scala/integration/kafka/api/SaslSetup.scala | 3 +-
.../kafka/api/SslAdminIntegrationTest.scala | 11 ++-
.../kafka/api/SslEndToEndAuthorizationTest.scala | 15 +--
.../server/DynamicBrokerReconfigurationTest.scala | 6 +-
.../kafka/server/IntegrationTestUtils.scala | 10 +-
...ListenersWithSameSecurityProtocolBaseTest.scala | 8 +-
.../DelegationTokenRequestsOnPlainTextTest.scala | 7 +-
.../kafka/server/DelegationTokenRequestsTest.scala | 3 +-
...nTokenRequestsWithDisableTokenFeatureTest.scala | 4 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 89 ++---------------
16 files changed, 175 insertions(+), 123 deletions(-)
diff --git a/core/src/test/java/kafka/security/JaasTestUtils.java
b/core/src/test/java/kafka/security/JaasTestUtils.java
index e80208b77d2..73e8a7245bc 100644
--- a/core/src/test/java/kafka/security/JaasTestUtils.java
+++ b/core/src/test/java/kafka/security/JaasTestUtils.java
@@ -18,9 +18,13 @@ package kafka.security;
import kafka.utils.TestUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.ScramMechanism;
import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.ConnectionMode;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.Java;
+import org.apache.kafka.test.TestSslUtils;
import java.io.BufferedWriter;
import java.io.File;
@@ -103,6 +107,8 @@ public class JaasTestUtils {
public static final String SERVICE_NAME = "kafka";
+ public static final String SSL_CERTIFICATE_CN = "localhost";
+
public static Properties saslConfigs(Optional<Properties> saslProperties) {
Properties result = saslProperties.orElse(new Properties());
if (IS_IBM_SECURITY &&
!result.containsKey(SaslConfigs.SASL_KERBEROS_SERVICE_NAME)) {
@@ -269,4 +275,106 @@ public class JaasTestUtils {
writer.write(String.join("",
jaasSections.stream().map(Object::toString).toArray(String[]::new)));
}
}
+
+ public static boolean usesSslTransportLayer(SecurityProtocol
securityProtocol) {
+ switch (securityProtocol) {
+ case SSL:
+ case SASL_SSL:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public static boolean usesSaslAuthentication(SecurityProtocol
securityProtocol) {
+ switch (securityProtocol) {
+ case SASL_PLAINTEXT:
+ case SASL_SSL:
+ return true;
+ default:
+ return false;
+ }
+ }
+
+ public static Properties sslConfigs(ConnectionMode mode,
+ boolean clientCert,
+ Optional<File> trustStoreFile,
+ String certAlias) throws Exception {
+ return sslConfigs(mode, clientCert, trustStoreFile, certAlias,
SSL_CERTIFICATE_CN, TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS);
+ }
+
+ public static Properties sslConfigs(ConnectionMode mode,
+ boolean clientCert,
+ Optional<File> trustStoreFile,
+ String certAlias,
+ String certCn,
+ String tlsProtocol) throws Exception {
+ File trustStore = trustStoreFile.orElseThrow(() -> new Exception("SSL
enabled but no trustStoreFile provided"));
+ Properties sslProps = new Properties();
+ sslProps.putAll(new TestSslUtils.SslConfigsBuilder(mode)
+ .useClientCert(clientCert)
+ .createNewTrustStore(trustStore)
+ .certAlias(certAlias)
+ .cn(certCn)
+ .tlsProtocol(tlsProtocol)
+ .build());
+ return sslProps;
+ }
+
+ public static Properties producerSecurityConfigs(SecurityProtocol
securityProtocol,
+ Optional<File>
trustStoreFile,
+ Optional<Properties>
saslProperties) throws Exception {
+ return securityConfigs(ConnectionMode.CLIENT, securityProtocol,
trustStoreFile, "producer", SSL_CERTIFICATE_CN, saslProperties);
+ }
+
+ public static Properties consumerSecurityConfigs(SecurityProtocol
securityProtocol, Optional<File> trustStoreFile, Optional<Properties>
saslProperties) throws Exception {
+ return securityConfigs(ConnectionMode.CLIENT, securityProtocol,
trustStoreFile, "consumer", SSL_CERTIFICATE_CN, saslProperties);
+ }
+
+ public static Properties adminClientSecurityConfigs(SecurityProtocol
securityProtocol, Optional<File> trustStoreFile, Optional<Properties>
saslProperties) throws Exception {
+ return securityConfigs(ConnectionMode.CLIENT, securityProtocol,
trustStoreFile, "admin-client", SSL_CERTIFICATE_CN, saslProperties);
+ }
+
+ public static Properties securityConfigs(ConnectionMode connectionMode,
+ SecurityProtocol securityProtocol,
+ Optional<File> trustStoreFile,
+ String certAlias,
+ String certCn,
+ Optional<Properties>
saslProperties) throws Exception {
+ return securityConfigs(connectionMode, securityProtocol,
trustStoreFile, certAlias, certCn, saslProperties,
+ TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS, Optional.empty());
+ }
+ /**
+ * Returns security configuration options for broker or clients
+ *
+ * @param connectionMode Client or server mode
+ * @param securityProtocol Security protocol which indicates if SASL or
SSL or both configs are included
+ * @param trustStoreFile Trust store file must be provided for SSL and
SASL_SSL
+ * @param certAlias Alias of certificate in SSL key store
+ * @param certCn CN for certificate
+ * @param saslProperties SASL configs if security protocol is SASL_SSL or
SASL_PLAINTEXT
+ * @param tlsProtocol TLS version
+ * @param needsClientCert If not empty, a flag which indicates if client
certificates are required. By default,
+ * client certificates are generated only if
securityProtocol is SSL (not for SASL_SSL).
+ */
+ public static Properties securityConfigs(ConnectionMode connectionMode,
+ SecurityProtocol securityProtocol,
+ Optional<File> trustStoreFile,
+ String certAlias,
+ String certCn,
+ Optional<Properties>
saslProperties,
+ String tlsProtocol,
+ Optional<Boolean>
needsClientCert) throws Exception {
+ Properties props = new Properties();
+ if (usesSslTransportLayer(securityProtocol)) {
+ boolean addClientCert = needsClientCert.orElse(securityProtocol ==
SecurityProtocol.SSL);
+ props.putAll(sslConfigs(connectionMode, addClientCert,
trustStoreFile, certAlias, certCn, tlsProtocol));
+ }
+
+ if (usesSaslAuthentication(securityProtocol)) {
+ props.putAll(saslConfigs(saslProperties));
+ }
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
securityProtocol.name());
+ return props;
+ }
}
diff --git
a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
index 6e8341d7358..2bf6c722cb5 100644
--- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala
@@ -22,6 +22,7 @@ import java.nio.charset.StandardCharsets
import java.util.{Collections, Properties}
import java.util.concurrent.TimeUnit
import kafka.integration.KafkaServerTestHarness
+import kafka.security.JaasTestUtils
import kafka.server.KafkaConfig
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, NewPartitions}
@@ -29,7 +30,7 @@ import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors.TimeoutException
-import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
+import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.{KafkaException, TopicPartition}
@@ -40,6 +41,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import scala.collection.mutable
+import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
@@ -70,12 +72,12 @@ abstract class BaseProducerSendTest extends
KafkaServerTestHarness {
super.setUp(testInfo)
admin = TestUtils.createAdminClient(brokers, listenerName,
- TestUtils.securityConfigs(ConnectionMode.CLIENT,
+ JaasTestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol,
- trustStoreFile,
+ OptionConverters.toJava(trustStoreFile),
"adminClient",
TestUtils.SslCertificateCn,
- clientSaslProperties))
+ OptionConverters.toJava(clientSaslProperties)))
consumer = TestUtils.createConsumer(
bootstrapServers(listenerName =
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)),
diff --git
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
index a9c54039750..7a778a12e03 100644
---
a/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/DescribeAuthorizedOperationsTest.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.server.config.{ServerConfigs,
ZkConfigs}
import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNull}
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import scala.compat.java8.OptionConverters
import scala.jdk.CollectionConverters._
object DescribeAuthorizedOperationsTest {
@@ -126,7 +127,7 @@ class DescribeAuthorizedOperationsTest extends
IntegrationTestHarness with SaslS
adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers())
adminClientConfig.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000")
val securityProps: util.Map[Object, Object] =
- TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
+ JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
adminClientConfig.putAll(securityProps)
adminClientConfig
}
diff --git
a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
index 3d58441c8d4..8f6271c8e5c 100644
--- a/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
@@ -26,6 +26,7 @@ import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig}
import kafka.server.KafkaConfig
import kafka.integration.KafkaServerTestHarness
+import kafka.security.JaasTestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer, Deserializer, Serializer}
@@ -36,6 +37,7 @@ import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import scala.collection.mutable
import scala.collection.Seq
+import scala.compat.java8.OptionConverters
/**
* A helper class for writing integration tests that involve producers,
consumers, and servers
@@ -170,8 +172,8 @@ abstract class IntegrationTestHarness extends
KafkaServerTestHarness {
}
def clientSecurityProps(certAlias: String): Properties = {
- TestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol,
trustStoreFile, certAlias, TestUtils.SslCertificateCn,
- clientSaslProperties)
+ JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol,
OptionConverters.toJava(trustStoreFile), certAlias,
+ JaasTestUtils.SSL_CERTIFICATE_CN,
OptionConverters.toJava(clientSaslProperties))
}
def superuserSecurityProps(certAlias: String): Properties = {
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 7190c939b2f..d99c8d41640 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -32,6 +32,8 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import scala.compat.java8.OptionConverters
+
class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
private val kafkaClientSaslMechanism = "SCRAM-SHA-256"
private val kafkaServerSaslMechanisms = List(kafkaClientSaslMechanism)
@@ -141,7 +143,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
@Test
def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
- val props = TestUtils.adminClientSecurityConfigs(securityProtocol,
trustStoreFile, clientSaslProperties)
+ val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
index aa85d9eb531..5f1226adcea 100644
---
a/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslPlainSslEndToEndAuthorizationTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.api
-import kafka.security.JaasModule
+import kafka.security.{JaasModule, JaasTestUtils}
import kafka.security.JaasTestUtils._
import kafka.utils.TestUtils
import kafka.utils.TestUtils.isAclSecure
@@ -27,15 +27,17 @@ import org.apache.kafka.common.network.ConnectionMode
import org.apache.kafka.common.security.auth._
import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
import org.apache.kafka.common.security.plain.PlainAuthenticateCallback
+import org.apache.kafka.test.TestSslUtils
import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
import org.junit.jupiter.api.Test
import java.security.AccessController
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
import javax.security.auth.Subject
import javax.security.auth.callback._
import javax.security.auth.login.AppConfigurationEntry
import scala.collection.Seq
+import scala.compat.java8.OptionConverters
import scala.jdk.CollectionConverters._
object SaslPlainSslEndToEndAuthorizationTest {
@@ -141,8 +143,9 @@ class SaslPlainSslEndToEndAuthorizationTest extends
SaslEndToEndAuthorizationTes
// Generate SSL certificates for clients since we are enabling TLS mutual
authentication
// in this test for the SASL_SSL listener.
override def clientSecurityProps(certAlias: String): Properties = {
- TestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol,
trustStoreFile, certAlias, TestUtils.SslCertificateCn,
- clientSaslProperties, needsClientCert = Some(true))
+ JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol,
OptionConverters.toJava(trustStoreFile),
+ certAlias, JaasTestUtils.SSL_CERTIFICATE_CN,
OptionConverters.toJava(clientSaslProperties),
+ TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS, Optional.of(true))
}
/**
diff --git a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
index 1a4e6a5425a..25cd86cab53 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSetup.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSetup.scala
@@ -39,6 +39,7 @@ import java.util
import java.util.Properties
import javax.security.auth.login.Configuration
import scala.collection.Seq
+import scala.compat.java8.OptionConverters
import scala.jdk.CollectionConverters._
import scala.jdk.OptionConverters._
import scala.util.Using
@@ -176,7 +177,7 @@ trait SaslSetup {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
val securityProps: util.Map[Object, Object] =
- TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
+ JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
securityProps.forEach { (key, value) =>
config.put(key.asInstanceOf[String], value) }
config.put(SaslConfigs.SASL_JAAS_CONFIG,
jaasScramClientLoginModule(scramMechanism, user, password))
Admin.create(config)
diff --git
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index fdaafce2659..bc6bd972f3a 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -15,8 +15,8 @@ package kafka.api
import java.util
import java.util.concurrent._
import java.util.Properties
-
import com.yammer.metrics.core.Gauge
+import kafka.security.JaasTestUtils
import kafka.security.authorizer.AclAuthorizer
import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.CreateAclsResult
@@ -35,6 +35,7 @@ import org.junit.jupiter.api.{AfterEach, Test}
import scala.jdk.CollectionConverters._
import scala.collection.mutable
+import scala.compat.java8.OptionConverters
object SslAdminIntegrationTest {
@volatile var semaphore: Option[Semaphore] = None
@@ -284,16 +285,16 @@ class SslAdminIntegrationTest extends
SaslSslAdminIntegrationTest {
// Override the CN to create a principal based on it
override def superuserSecurityProps(certAlias: String): Properties = {
- val props = TestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, trustStoreFile, certAlias,
SslAdminIntegrationTest.superuserCn,
- clientSaslProperties)
+ val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, OptionConverters.toJava(trustStoreFile),
+ certAlias, SslAdminIntegrationTest.superuserCn,
OptionConverters.toJava(clientSaslProperties))
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
// Override the CN to create a principal based on it
override def clientSecurityProps(certAlias: String): Properties = {
- val props = TestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, trustStoreFile, certAlias, SslAdminIntegrationTest.clientCn,
- clientSaslProperties)
+ val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, OptionConverters.toJava(trustStoreFile),
+ certAlias, SslAdminIntegrationTest.clientCn,
OptionConverters.toJava(clientSaslProperties))
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
diff --git
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
index 5c5c24d36cc..5fc8295626e 100644
---
a/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SslEndToEndAuthorizationTest.scala
@@ -17,9 +17,9 @@
package kafka.api
-import java.util.Properties
+import kafka.security.JaasTestUtils
-import kafka.utils.TestUtils
+import java.util.Properties
import org.apache.kafka.common.config.SslConfigs
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.network.ConnectionMode
@@ -28,6 +28,9 @@ import
org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuild
import org.apache.kafka.common.utils.Java
import org.junit.jupiter.api.{BeforeEach, TestInfo}
+import java.util.Optional
+import scala.compat.java8.OptionConverters
+
object SslEndToEndAuthorizationTest {
val superuserCn = "super-user"
@@ -78,16 +81,16 @@ class SslEndToEndAuthorizationTest extends
EndToEndAuthorizationTest {
}
override def clientSecurityProps(certAlias: String): Properties = {
- val props = TestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, trustStoreFile,
- certAlias, clientCn, clientSaslProperties, tlsProtocol)
+ val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, OptionConverters.toJava(trustStoreFile),
+ certAlias, clientCn, OptionConverters.toJava(clientSaslProperties),
tlsProtocol, Optional.empty())
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
// This test doesn't really care about matching the SSL certificate to a
particular principal
// We can override the CN and create a principal based on it or on the
server SSL
override def superuserSecurityProps(certAlias: String): Properties = {
- val props = TestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, trustStoreFile,
- certAlias, superuserCn, clientSaslProperties, tlsProtocol)
+ val props = JaasTestUtils.securityConfigs(ConnectionMode.CLIENT,
securityProtocol, OptionConverters.toJava(trustStoreFile),
+ certAlias, superuserCn, OptionConverters.toJava(clientSaslProperties),
tlsProtocol, Optional.empty())
props.remove(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG)
props
}
diff --git
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 6549bc925fd..93418cf99f4 100644
---
a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++
b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -24,7 +24,7 @@ import java.lang.management.ManagementFactory
import java.security.KeyStore
import java.time.Duration
import java.util
-import java.util.{Collections, Properties}
+import java.util.{Collections, Optional, Properties}
import java.util.concurrent._
import javax.management.ObjectName
import com.yammer.metrics.core.MetricName
@@ -107,8 +107,8 @@ class DynamicBrokerReconfigurationTest extends
QuorumTestHarness with SaslSetup
private val trustStoreFile1 = TestUtils.tempFile("truststore", ".jks")
private val trustStoreFile2 = TestUtils.tempFile("truststore", ".jks")
- private val sslProperties1 = TestUtils.sslConfigs(ConnectionMode.SERVER,
clientCert = false, Some(trustStoreFile1), "kafka")
- private val sslProperties2 = TestUtils.sslConfigs(ConnectionMode.SERVER,
clientCert = false, Some(trustStoreFile2), "kafka")
+ private val sslProperties1 = JaasTestUtils.sslConfigs(ConnectionMode.SERVER,
false, Optional.of(trustStoreFile1), "kafka")
+ private val sslProperties2 = JaasTestUtils.sslConfigs(ConnectionMode.SERVER,
false, Optional.of(trustStoreFile2), "kafka")
private val invalidSslProperties = invalidSslConfigs
@BeforeEach
diff --git
a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
index 0623a16b388..7e357eb6e2b 100644
--- a/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
+++ b/core/src/test/scala/integration/kafka/server/IntegrationTestUtils.scala
@@ -20,13 +20,12 @@ package kafka.server
import java.io.{DataInputStream, DataOutputStream}
import java.net.Socket
import java.nio.ByteBuffer
-import java.util.{Collections, Properties}
-
+import java.util.{Collections, Optional, Properties}
import kafka.network.SocketServer
+import kafka.security.JaasTestUtils
import kafka.utils.Implicits._
-import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, NewTopic}
-import org.apache.kafka.common.network.{ListenerName, ConnectionMode}
+import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
import org.apache.kafka.common.protocol.ApiKeys
import org.apache.kafka.common.requests.{AbstractRequest, AbstractResponse,
RequestHeader, ResponseHeader}
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -139,6 +138,7 @@ object IntegrationTestUtils {
}
def clientSecurityProps(certAlias: String): Properties = {
- TestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol, None,
certAlias, TestUtils.SslCertificateCn, None) // TODO use real trust store and
client SASL properties
+ JaasTestUtils.securityConfigs(ConnectionMode.CLIENT, securityProtocol,
Optional.empty(), certAlias,
+ JaasTestUtils.SSL_CERTIFICATE_CN, Optional.empty()) // TODO use real
trust store and client SASL properties
}
}
diff --git
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 93dcce79826..28ac8b31237 100644
---
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -18,7 +18,7 @@
package kafka.server
-import java.util.{Collections, Objects, Properties}
+import java.util.{Collections, Objects, Optional, Properties}
import java.util.concurrent.TimeUnit
import kafka.api.SaslSetup
import kafka.security.JaasTestUtils
@@ -92,7 +92,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
props.put(SaslConfigs.SASL_KERBEROS_SERVICE_NAME, "kafka")
props ++= dynamicJaasSections
- props ++= TestUtils.sslConfigs(ConnectionMode.SERVER, clientCert =
false, Some(trustStoreFile), s"server$brokerId")
+ props ++= JaasTestUtils.sslConfigs(ConnectionMode.SERVER, false,
Optional.of(trustStoreFile), s"server$brokerId")
// set listener-specific configs and set an invalid path for the global
config to verify that the overrides work
Seq(SecureInternal, SecureExternal).foreach { listenerName =>
@@ -120,7 +120,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
val listenerName = endPoint.listenerName
val trustStoreFile =
- if (TestUtils.usesSslTransportLayer(endPoint.securityProtocol))
Some(this.trustStoreFile)
+ if (JaasTestUtils.usesSslTransportLayer(endPoint.securityProtocol))
Some(this.trustStoreFile)
else None
val bootstrapServers = TestUtils.bootstrapServers(servers, listenerName)
@@ -138,7 +138,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
securityProtocol = endPoint.securityProtocol, trustStoreFile =
trustStoreFile, saslProperties = saslProps)
}
- if (TestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
+ if (JaasTestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach {
mechanism =>
addProducerConsumer(listenerName, mechanism,
Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
}
diff --git
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
index a64792f8408..ca1d32e91b9 100644
---
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
+++
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsOnPlainTextTest.scala
@@ -16,9 +16,9 @@
*/
package kafka.server
-import java.util
+import kafka.security.JaasTestUtils
-import kafka.utils.TestUtils
+import java.util
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.UnsupportedByAuthenticationException
import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
@@ -26,6 +26,7 @@ import org.junit.jupiter.api.Assertions.assertThrows
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
+import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
class DelegationTokenRequestsOnPlainTextTest extends BaseRequestTest {
@@ -42,7 +43,7 @@ class DelegationTokenRequestsOnPlainTextTest extends
BaseRequestTest {
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
- TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
+ JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
securityProps.forEach { (key, value) =>
config.put(key.asInstanceOf[String], value) }
config
}
diff --git
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
index bf134c5a4bc..18f7b3f538e 100644
--- a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala
@@ -30,6 +30,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
+import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
import scala.jdk.CollectionConverters._
@@ -59,7 +60,7 @@ class DelegationTokenRequestsTest extends
IntegrationTestHarness with SaslSetup
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
- TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
+ JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
securityProps.forEach { (key, value) =>
config.put(key.asInstanceOf[String], value) }
config
}
diff --git
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
index ff662159178..a28d784c5cb 100644
---
a/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
+++
b/core/src/test/scala/unit/kafka/server/DelegationTokenRequestsWithDisableTokenFeatureTest.scala
@@ -18,7 +18,6 @@ package kafka.server
import kafka.api.{KafkaSasl, SaslSetup}
import kafka.security.JaasTestUtils
-import kafka.utils.TestUtils
import org.apache.kafka.clients.admin.{Admin, AdminClientConfig}
import org.apache.kafka.common.errors.DelegationTokenDisabledException
import org.apache.kafka.common.security.auth.SecurityProtocol
@@ -28,6 +27,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
import java.util
+import scala.compat.java8.OptionConverters
import scala.concurrent.ExecutionException
class DelegationTokenRequestsWithDisableTokenFeatureTest extends
BaseRequestTest with SaslSetup {
@@ -50,7 +50,7 @@ class DelegationTokenRequestsWithDisableTokenFeatureTest
extends BaseRequestTest
val config = new util.HashMap[String, Object]
config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val securityProps: util.Map[Object, Object] =
- TestUtils.adminClientSecurityConfigs(securityProtocol, trustStoreFile,
clientSaslProperties)
+ JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
securityProps.forEach { (key, value) =>
config.put(key.asInstanceOf[String], value) }
config
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index f503caf43c4..a833258836a 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -30,7 +30,7 @@ import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.consumer._
import org.apache.kafka.clients.consumer.internals.AbstractCoordinator
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
-import org.apache.kafka.clients.{ClientResponse, CommonClientConfigs}
+import org.apache.kafka.clients.ClientResponse
import org.apache.kafka.common._
import org.apache.kafka.common.acl.{AccessControlEntry,
AccessControlEntryFilter, AclBindingFilter}
import org.apache.kafka.common.compress.Compression
@@ -67,7 +67,7 @@ import org.apache.kafka.server.{ClientMetricsManager,
ControllerRequestCompletio
import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
import org.apache.kafka.storage.internals.log.{CleanerConfig, LogConfig,
LogDirFailureChannel, ProducerStateManagerConfig}
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
-import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.apache.zookeeper.KeeperException.SessionExpiredException
import org.apache.zookeeper.ZooDefs._
import org.apache.zookeeper.data.ACL
@@ -88,6 +88,7 @@ import java.util.{Arrays, Collections, Optional, Properties}
import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer
import scala.collection.{Map, Seq, mutable}
+import scala.compat.java8.OptionConverters
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.{Await, ExecutionContext, Future}
import scala.jdk.CollectionConverters._
@@ -360,10 +361,10 @@ object TestUtils extends Logging {
props.put(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG, "2")
props.put(ServerConfigs.BACKGROUND_THREADS_CONFIG, "2")
- if (protocolAndPorts.exists { case (protocol, _) =>
usesSslTransportLayer(protocol) })
- props ++= sslConfigs(ConnectionMode.SERVER, false, trustStoreFile,
s"server$nodeId")
+ if (protocolAndPorts.exists { case (protocol, _) =>
JaasTestUtils.usesSslTransportLayer(protocol) })
+ props ++= JaasTestUtils.sslConfigs(ConnectionMode.SERVER, false,
OptionConverters.toJava(trustStoreFile), s"server$nodeId")
- if (protocolAndPorts.exists { case (protocol, _) =>
usesSaslAuthentication(protocol) })
+ if (protocolAndPorts.exists { case (protocol, _) =>
JaasTestUtils.usesSaslAuthentication(protocol) })
props ++= JaasTestUtils.saslConfigs(saslProperties.toJava)
interBrokerSecurityProtocol.foreach { protocol =>
@@ -646,44 +647,6 @@ object TestUtils extends Logging {
*/
def randomBytes(numBytes: Int): Array[Byte] =
JTestUtils.randomBytes(numBytes)
- /**
- * Returns security configuration options for broker or clients
- *
- * @param connectionMode Client or server mode
- * @param securityProtocol Security protocol which indicates if SASL or SSL
or both configs are included
- * @param trustStoreFile Trust store file must be provided for SSL and
SASL_SSL
- * @param certAlias Alias of certificate in SSL key store
- * @param certCn CN for certificate
- * @param saslProperties SASL configs if security protocol is SASL_SSL or
SASL_PLAINTEXT
- * @param tlsProtocol TLS version
- * @param needsClientCert If not empty, a flag which indicates if client
certificates are required. By default
- * client certificates are generated only if
securityProtocol is SSL (not for SASL_SSL).
- */
- def securityConfigs(connectionMode: ConnectionMode,
- securityProtocol: SecurityProtocol,
- trustStoreFile: Option[File],
- certAlias: String,
- certCn: String,
- saslProperties: Option[Properties],
- tlsProtocol: String =
TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS,
- needsClientCert: Option[Boolean] = None): Properties = {
- val props = new Properties
- if (usesSslTransportLayer(securityProtocol)) {
- val addClientCert = needsClientCert.getOrElse(securityProtocol ==
SecurityProtocol.SSL)
- props ++= sslConfigs(connectionMode, addClientCert, trustStoreFile,
certAlias, certCn, tlsProtocol)
- }
-
- if (usesSaslAuthentication(securityProtocol))
- props ++= JaasTestUtils.saslConfigs(saslProperties.toJava)
- props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
securityProtocol.name)
- props
- }
-
- def producerSecurityConfigs(securityProtocol: SecurityProtocol,
- trustStoreFile: Option[File],
- saslProperties: Option[Properties]): Properties =
- securityConfigs(ConnectionMode.CLIENT, securityProtocol, trustStoreFile,
"producer", SslCertificateCn, saslProperties)
-
/**
* Create a (new) producer with a few pre-configured properties.
*/
@@ -715,26 +678,10 @@ object TestUtils extends Logging {
producerProps.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize.toString)
producerProps.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, compressionType)
producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,
enableIdempotence.toString)
- producerProps ++= producerSecurityConfigs(securityProtocol,
trustStoreFile, saslProperties)
+ producerProps ++= JaasTestUtils.producerSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(saslProperties))
new KafkaProducer[K, V](producerProps, keySerializer, valueSerializer)
}
- def usesSslTransportLayer(securityProtocol: SecurityProtocol): Boolean =
securityProtocol match {
- case SecurityProtocol.SSL | SecurityProtocol.SASL_SSL => true
- case _ => false
- }
-
- def usesSaslAuthentication(securityProtocol: SecurityProtocol): Boolean =
securityProtocol match {
- case SecurityProtocol.SASL_PLAINTEXT | SecurityProtocol.SASL_SSL => true
- case _ => false
- }
-
- def consumerSecurityConfigs(securityProtocol: SecurityProtocol,
trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
- securityConfigs(ConnectionMode.CLIENT, securityProtocol, trustStoreFile,
"consumer", SslCertificateCn, saslProperties)
-
- def adminClientSecurityConfigs(securityProtocol: SecurityProtocol,
trustStoreFile: Option[File], saslProperties: Option[Properties]): Properties =
- securityConfigs(ConnectionMode.CLIENT, securityProtocol, trustStoreFile,
"admin-client", SslCertificateCn, saslProperties)
-
/**
* Create a consumer with a few pre-configured properties.
*/
@@ -758,7 +705,7 @@ object TestUtils extends Logging {
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
enableAutoCommit.toString)
consumerProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
maxPollRecords.toString)
consumerProps.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, if
(readCommitted) "read_committed" else "read_uncommitted")
- consumerProps ++= consumerSecurityConfigs(securityProtocol,
trustStoreFile, saslProperties)
+ consumerProps ++= JaasTestUtils.consumerSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(saslProperties))
new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
}
@@ -1388,26 +1335,6 @@ object TestUtils extends Logging {
new String(bytes, encoding)
}
- def sslConfigs(mode: ConnectionMode, clientCert: Boolean, trustStoreFile:
Option[File], certAlias: String,
- certCn: String = SslCertificateCn,
- tlsProtocol: String =
TestSslUtils.DEFAULT_TLS_PROTOCOL_FOR_TESTS): Properties = {
- val trustStore = trustStoreFile.getOrElse {
- throw new Exception("SSL enabled but no trustStoreFile provided")
- }
-
- val sslConfigs = new TestSslUtils.SslConfigsBuilder(mode)
- .useClientCert(clientCert)
- .createNewTrustStore(trustStore)
- .certAlias(certAlias)
- .cn(certCn)
- .tlsProtocol(tlsProtocol)
- .build()
-
- val sslProps = new Properties()
- sslConfigs.forEach { (k, v) => sslProps.put(k, v) }
- sslProps
- }
-
def waitAndVerifyAcls(expected: Set[AccessControlEntry],
authorizer: JAuthorizer,
resource: ResourcePattern,