This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e035f7036d5 MINOR: convert SaslClientsWithInvalidCredentialsTest + 
MultipleListenersWithSameSecurityProtocolBaseTest to KRaft (#17803)
e035f7036d5 is described below

commit e035f7036d52cb0c1c3a75209ba831c330011b2f
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Nov 14 15:12:11 2024 -0800

    MINOR: convert SaslClientsWithInvalidCredentialsTest + 
MultipleListenersWithSameSecurityProtocolBaseTest to KRaft (#17803)
    
    Reviewers: Justine Olshan <[email protected]>
---
 .../SaslClientsWithInvalidCredentialsTest.scala    | 37 ++++++++++++--------
 ...ListenersWithSameSecurityProtocolBaseTest.scala | 39 +++++++++++++++-------
 2 files changed, 50 insertions(+), 26 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
index 1a4451c2b0a..af3f030648f 100644
--- 
a/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/SaslClientsWithInvalidCredentialsTest.scala
@@ -23,17 +23,18 @@ import org.apache.kafka.clients.consumer.{Consumer, 
ConsumerConfig}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.SaslAuthenticationException
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions._
 import kafka.utils.{TestInfoUtils, TestUtils}
-import kafka.zk.ConfigEntityChangeNotificationZNode
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
+import org.apache.kafka.metadata.storage.Formatter
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
 
 import scala.jdk.javaapi.OptionConverters
+import scala.util.Using
 
 
 class SaslClientsWithInvalidCredentialsTest extends AbstractSaslTest {
@@ -57,9 +58,11 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
 
   override def configureSecurityBeforeServersStart(testInfo: TestInfo): Unit = 
{
     super.configureSecurityBeforeServersStart(testInfo)
-    
zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
-    // Create broker credentials before starting brokers
-    createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_ADMIN, 
JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD)
+  }
+
+  override def addFormatterSettings(formatter: Formatter): Unit = {
+    formatter.setScramArguments(
+      
List(s"SCRAM-SHA-256=[name=${JaasTestUtils.KAFKA_SCRAM_ADMIN},password=${JaasTestUtils.KAFKA_SCRAM_ADMIN_PASSWORD}]").asJava)
   }
 
   override def createPrivilegedAdminClient() = {
@@ -72,7 +75,11 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     startSasl(jaasSections(kafkaServerSaslMechanisms, 
Some(kafkaClientSaslMechanism), Both,
       JaasTestUtils.KAFKA_SERVER_CONTEXT_NAME))
     super.setUp(testInfo)
-    createTopic(topic, numPartitions, brokerCount)
+    Using(createPrivilegedAdminClient()) { superuserAdminClient =>
+      TestUtils.createTopicWithAdmin(
+        superuserAdminClient, topic, brokers, controllerServers, numPartitions
+      )
+    }
   }
 
   @AfterEach
@@ -81,7 +88,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     closeSasl()
   }
 
-  @ParameterizedTest
+  
@ParameterizedTest(name="{displayName}.quorum=kraft.isIdempotenceEnabled={0}")
   @ValueSource(booleans = Array(true, false))
   def testProducerWithAuthenticationFailure(isIdempotenceEnabled: Boolean): 
Unit = {
     val prop = new Properties()
@@ -101,8 +108,9 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     verifyWithRetry(sendOneRecord(producer2))
   }
 
-  @Test
-  def testTransactionalProducerWithAuthenticationFailure(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testTransactionalProducerWithAuthenticationFailure(quorum: String, 
groupProtocol: String): Unit = {
     val txProducer = createTransactionalProducer()
     verifyAuthenticationException(txProducer.initTransactions())
 
@@ -111,7 +119,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testConsumerWithAuthenticationFailure(quorum: String, groupProtocol: 
String): Unit = {
     val consumer = createConsumer()
     consumer.subscribe(List(topic).asJava)
@@ -119,7 +127,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testManualAssignmentConsumerWithAuthenticationFailure(quorum: String, 
groupProtocol: String): Unit = {
     val consumer = createConsumer()
     consumer.assign(List(tp).asJava)
@@ -127,7 +135,7 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
   }
 
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def 
testManualAssignmentConsumerWithAutoCommitDisabledWithAuthenticationFailure(quorum:
 String, groupProtocol: String): Unit = {
     this.consumerConfig.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, 
false.toString)
     val consumer = createConsumer()
@@ -146,8 +154,9 @@ class SaslClientsWithInvalidCredentialsTest extends 
AbstractSaslTest {
     verifyWithRetry(assertEquals(1, 
consumer.poll(Duration.ofMillis(1000)).count))
   }
 
-  @Test
-  def testKafkaAdminClientWithAuthenticationFailure(): Unit = {
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
+  def testKafkaAdminClientWithAuthenticationFailure(quorum: String, 
groupProtocol: String): Unit = {
     val props = JaasTestUtils.adminClientSecurityConfigs(securityProtocol, 
OptionConverters.toJava(trustStoreFile), 
OptionConverters.toJava(clientSaslProperties))
     props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers())
     val adminClient = Admin.create(props)
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index 16d1454604e..ec44af7b236 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -25,13 +25,14 @@ import kafka.security.JaasTestUtils
 import kafka.security.JaasTestUtils.JaasSection
 import kafka.utils.{TestInfoUtils, TestUtils}
 import kafka.utils.Implicits._
+import org.apache.kafka.clients.admin.{Admin, AdminClientConfig, NewTopic}
 import org.apache.kafka.clients.consumer.Consumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{SaslConfigs, SslConfigs}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ConnectionMode, ListenerName}
-import org.apache.kafka.server.config.{ReplicationConfigs, ZkConfigs}
+import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs}
 import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.network.SocketServerConfigs
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -57,7 +58,8 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
   import MultipleListenersWithSameSecurityProtocolBaseTest._
 
   private val trustStoreFile = TestUtils.tempFile("truststore", ".jks")
-  private val servers = new ArrayBuffer[KafkaServer]
+  private val servers = new ArrayBuffer[KafkaBroker]
+  private var admin: Admin = null
   private val producers = mutable.Map[ClientMetadata, 
KafkaProducer[Array[Byte], Array[Byte]]]()
   private val consumers = mutable.Map[ClientMetadata, Consumer[Array[Byte], 
Array[Byte]]]()
 
@@ -78,14 +80,15 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
 
     (0 until numServers).foreach { brokerId =>
 
-      val props = TestUtils.createBrokerConfig(brokerId, zkConnect, 
trustStoreFile = Some(trustStoreFile))
+      val props = TestUtils.createBrokerConfig(brokerId, null, trustStoreFile 
= Some(trustStoreFile))
       // Ensure that we can support multiple listeners per security protocol 
and multiple security protocols
       props.put(SocketServerConfigs.LISTENERS_CONFIG, 
s"$SecureInternal://localhost:0, $Internal://localhost:0, " +
         s"$SecureExternal://localhost:0, $External://localhost:0")
+      props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
props.get(SocketServerConfigs.LISTENERS_CONFIG))
       props.put(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
s"$Internal:PLAINTEXT, $SecureInternal:SASL_SSL," +
-        s"$External:PLAINTEXT, $SecureExternal:SASL_SSL")
+        s"$External:PLAINTEXT, $SecureExternal:SASL_SSL, CONTROLLER:PLAINTEXT")
+      props.put(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG, "CONTROLLER")
       props.put(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, Internal)
-      props.put(ZkConfigs.ZK_ENABLE_SECURE_ACLS_CONFIG, "true")
       
props.put(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG, 
kafkaClientSaslMechanism)
       props.put(s"${new 
ListenerName(SecureInternal).configPrefix}${BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG}",
         kafkaServerSaslMechanisms(SecureInternal).mkString(","))
@@ -103,7 +106,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
       }
       props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
 
-      servers += TestUtils.createServer(KafkaConfig.fromProps(props))
+      servers += createBroker(KafkaConfig.fromProps(props))
     }
 
     servers.map(_.config).foreach { config =>
@@ -113,10 +116,20 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
         s"Unexpected ${ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG} 
for broker ${config.brokerId}")
     }
 
-    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, 
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT,
-      replicationFactor = 2, servers, 
servers.head.groupCoordinator.groupMetadataTopicConfigs)
-
-    createScramCredentials(zkConnect, JaasTestUtils.KAFKA_SCRAM_USER, 
JaasTestUtils.KAFKA_SCRAM_PASSWORD)
+    val adminClientConfig = new java.util.HashMap[String, Object]()
+    adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
+        TestUtils.bootstrapServers(servers, new ListenerName(Internal)))
+    admin = Admin.create(adminClientConfig)
+    val newTopic = new NewTopic(Topic.GROUP_METADATA_TOPIC_NAME,
+      GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_DEFAULT, 2.toShort)
+    val newTopicConfigs = new java.util.HashMap[String, String]()
+    servers.head.groupCoordinator.groupMetadataTopicConfigs.entrySet().
+      forEach(e => newTopicConfigs.put(e.getKey.toString, e.getValue.toString))
+    newTopic.configs(newTopicConfigs)
+    admin.createTopics(java.util.Arrays.asList(newTopic)).all().get(5, 
TimeUnit.MINUTES)
+
+    createScramCredentials(admin, JaasTestUtils.KAFKA_SCRAM_USER, 
JaasTestUtils.KAFKA_SCRAM_PASSWORD)
+    TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
 
     servers.head.config.listeners.foreach { endPoint =>
       val listenerName = endPoint.listenerName
@@ -130,7 +143,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
       def addProducerConsumer(listenerName: ListenerName, mechanism: String, 
saslProps: Option[Properties]): Unit = {
 
         val topic = s"${listenerName.value}${producers.size}"
-        TestUtils.createTopic(zkClient, topic, 2, 2, servers)
+        admin.createTopics(java.util.Arrays.asList(new NewTopic(topic, 2, 
2.toShort))).all().get(5, TimeUnit.MINUTES)
         val clientMetadata = ClientMetadata(listenerName, mechanism, topic)
 
         producers(clientMetadata) = TestUtils.createProducer(bootstrapServers, 
acks = -1,
@@ -153,6 +166,8 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
 
   @AfterEach
   override def tearDown(): Unit = {
+    Option(admin).foreach(_.close())
+    admin = null
     producers.values.foreach(_.close())
     consumers.values.foreach(_.close())
     TestUtils.shutdownServers(servers)
@@ -165,7 +180,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
     * with acks=-1 to ensure that replication is also working.
     */
   @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
+  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly"))
   def testProduceConsume(quorum: String, groupProtocol: String): Unit = {
     producers.foreach { case (clientMetadata, producer) =>
       val producerRecords = (1 to 10).map(i => new 
ProducerRecord(clientMetadata.topic, s"key$i".getBytes,

Reply via email to