This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new e035f7036d5 MINOR: convert SaslClientsWithInvalidCredentialsTest +
MultipleListenersWithSameSecurityProtocolBaseTest to KRaft (#17803)
e035f7036d5 is described below
commit e035f7036d52cb0c1c3a75209ba831c330011b2f
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Nov 14 15:12:11 2024 -0800
MINOR: convert SaslClientsWithInvalidCredentialsTest +
MultipleListenersWithSameSecurityProtocolBaseTest to KRaft (#17803)
Reviewers: Justine Olshan <[email protected]>
---
.../SaslClientsWithInvalidCredentialsTest.scala | 37 ++++++++++++--------
...ListenersWithSameSecurityProtocolBaseTest.scala | 39 +++++++++++++++-------
2 files changed, 50 insertions(+), 26 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 1a4451c2b0a..af3f030648f 100644
---
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -23,17 +23,18 @@ import org.apache.kafka.clients.consumer.{Consumer,
ConsumerConfig}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig,
ProducerRecord}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.errors.SaslAuthenticationException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions._
import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.zk.ConfigEntityChangeNotificationZNode
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.metadata.storage.Formatter
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import scala.jdk.javaapi.OptionConverters
+import scala.util.Using
class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
@@ -57,9 +58,11 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit =
{
super.configureSecurityBeforeServersStart(testInfo)
-
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
- // Create broker credentials before starting brokers
- createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN,
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
+ }
+
+ override def addFormatterSettings(formatter: Formatter): Unit = {
+ formatter.setScramArguments(
+
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava)
}
override def createPrivilegedAdminClient() = {
@@ -72,7 +75,11 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
startSasl(jaasSections(kafkaServerSaslMechanisms,
Some(kafkaClientSaslMechanism), Both,
JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
super.setUp(testInfo)
- createTopic(topic, numPartitions, brokerCount)
+ Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+ TestUtils.createTopicWithAdmin(
+ superuserAdminClient, topic, brokers, controllerServers, numPartitions
+ )
+ }
}
@AfterEach
@@ -81,7 +88,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
closeSasl()
}
- @ParameterizedTest
+
@ParameterizedTest(name="{displayName}.quorum=kraft.isIdempotenceEnabled={0}")
@ValueSource(booleans = Array(true, false))
def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean):
Unit = {
val prop = new Properties()
@@ -101,8 +108,9 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
verifyWithRetry(sendOneRecord(producer2))
}
- @Test
- def testTransactionalProducerWithAuthenticationFailure(): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testTransactionalProducerWithAuthenticationFailure(quorum: String,
groupProtocol: String): Unit = {
val txProducer = createTransactionalProducer()
verifyAuthenticationException(txProducer.initTransactions())
@@ -111,7 +119,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol:
String): Unit = {
val consumer = createConsumer()
consumer.subscribe(List(topic).asJava)
@@ -119,7 +127,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String,
groupProtocol: String): Unit = {
val consumer = createConsumer()
consumer.assign(List(tp).asJava)
@@ -127,7 +135,7 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
}
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum:
String, groupProtocol: String): Unit = {
this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
false.toString)
val consumer = createConsumer()
@@ -146,8 +154,9 @@ class SaslClientsWithInvalidCredentialsTest extends
AbstractSaslTest {
verifyWithRetry(assertEquals(1,
consumer.poll(Duration.ofMillis(1000)).count))
}
- @Test
- def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
+ @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+ def testKafkaAdminClientWithAuthenticationFailure(quorum: String,
groupProtocol: String): Unit = {
val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol,
OptionConverters.toJava(trustStoreFile),
OptionConverters.toJava(clientSaslProperties))
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
val adminClient = Admin.create(props)
diff --git
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 16d1454604e..ec44af7b236 100644
---
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -25,13 +25,14 @@ import kafka.security.JaasTestUtils
import kafka.security.JaasTestUtils.JaasSection
import kafka.utils.{TestInfoUtils, TestUtils}
import kafka.utils.Implicits._
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewTopic}
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
-import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.network.SocketServerConfigs
import org.junit.jupiter.api.Assertions.assertEquals
@@ -57,7 +58,8 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
import MultipleListenersWithSameSecurityProtocolBaseTest._
private val trustStoreFile = TestUtils.tempFile("truststore", ".jks")
- private val servers = new ArrayBuffer[KafkaServer]
+ private val servers = new ArrayBuffer[KafkaBroker]
+ private var admin: Admin = null
private val producers = mutable.Map[ClientMetadata,
KafkaProducer[Array[Byte], Array[Byte]]]()
private val consumers = mutable.Map[ClientMetadata, Consumer[Array[Byte],
Array[Byte]]]()
@@ -78,14 +80,15 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
(0 until numServers).foreach { brokerId =>
- val props = TestUtils.createBrokerConfig(brokerId, zkConnect,
trustStoreFile = Some(trustStoreFile))
+ val props = TestUtils.createBrokerConfig(brokerId, null, trustStoreFile
= Some(trustStoreFile))
// Ensure that we can support multiple listeners per security protocol
and multiple security protocols
props.put(SocketServerConfigs.LISTENERS_CONFIG,
s"$SecureInternal://localhost:0, $Internal://localhost:0, " +
s"$SecureExternal://localhost:0, $External://localhost:0")
+ props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
props.get(SocketServerConfigs.LISTENERS_CONFIG))
props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
- s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
+ s"$External:PLAINTEXT, $SecureExternal:SASL_SSL, CONTROLLER:PLAINTEXT")
+ props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, Internal)
- props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
kafkaClientSaslMechanism)
props.put(s"${new
ListenerName(SecureInternal).configPrefix}${BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG}",
kafkaServerSaslMechanisms(SecureInternal).mkString(","))
@@ -103,7 +106,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
}
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
- servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+ servers += createBroker(KafkaConfig.fromProps(props))
}
servers.map(_.config).foreach { config =>
@@ -113,10 +116,20 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
s"Unexpected ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG}
for broker ${config.brokerId}")
}
- TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME,
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT,
- replicationFactor = 2, servers,
servers.head.groupCoordinator.groupMetadataTopicConfigs)
-
- createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_USER,
JaasTestUtils.KAFKA_SCRAM_PASSWORD)
+ val adminClientConfig = new java.util.HashMap[String, Object]()
+ adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+ TestUtils.bootstrapServers(servers, new ListenerName(Internal)))
+ admin = Admin.create(adminClientConfig)
+ val newTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME,
+ GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, 2.toShort)
+ val newTopicConfigs = new java.util.HashMap[String, String]()
+ servers.head.groupCoordinator.groupMetadataTopicConfigs.entrySet().
+ forEach(e => newTopicConfigs.put(e.getKey.toString, e.getValue.toString))
+ newTopic.configs(newTopicConfigs)
+ admin.createTopics(java.util.Arrays.asList(newTopic)).all().get(5,
TimeUnit.MINUTES)
+
+ createScramCredentials(admin, JaasTestUtils.KAFKA_SCRAM_USER,
JaasTestUtils.KAFKA_SCRAM_PASSWORD)
+ TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
servers.head.config.listeners.foreach { endPoint =>
val listenerName = endPoint.listenerName
@@ -130,7 +143,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
def addProducerConsumer(listenerName: ListenerName, mechanism: String,
saslProps: Option[Properties]): Unit = {
val topic = s"${listenerName.value}${producers.size}"
- TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+ admin.createTopics(java.util.Arrays.asList(new NewTopic(topic, 2,
2.toShort))).all().get(5, TimeUnit.MINUTES)
val clientMetadata = ClientMetadata(listenerName, mechanism, topic)
producers(clientMetadata) = TestUtils.createProducer(bootstrapServers,
acks = -1,
@@ -153,6 +166,8 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
@AfterEach
override def tearDown(): Unit = {
+ Option(admin).foreach(_.close())
+ admin = null
producers.values.foreach(_.close())
consumers.values.foreach(_.close())
TestUtils.shutdownServers(servers)
@@ -165,7 +180,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
* with acks=-1 to ensure that replication is also working.
*/
@ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
def testProduceConsume(quorum: String, groupProtocol: String): Unit = {
producers.foreach { case (clientMetadata, producer) =>
val producerRecords = (1 to 10).map(i => new
ProducerRecord(clientMetadata.topic, s"key$i".getBytes,