This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9e01a921550 KAFKA-15722: Add KRaft support in 
RackAwareAutoTopicCreationTest
9e01a921550 is described below

commit 9e01a9215506f82e3ce9386816123005e6513fe8
Author: TapDang <89607407+phong260...@users.noreply.github.com>
AuthorDate: Mon Nov 11 22:18:12 2024 -0500

    KAFKA-15722: Add KRaft support in RackAwareAutoTopicCreationTest
    
    Reviewers: Colin P. McCabe <cmcc...@apache.org>
---
 .../kafka/api/RackAwareAutoTopicCreationTest.scala | 51 ++++++++++++++++------
 1 file changed, 38 insertions(+), 13 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
 
b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
index a8d2431f80b..9bd23de7137 100644
--- 
a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala
@@ -17,34 +17,54 @@
 package kafka.api
 
 import java.util.Properties
-
-import kafka.admin.{RackAwareMode, RackAwareTest}
+import kafka.admin.RackAwareTest
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.admin.Admin
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.MethodSource
+
 import scala.collection.Map
+import scala.jdk.CollectionConverters.ListHasAsScala
 
 class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with 
RackAwareTest {
   val numServers = 4
   val numPartitions = 8
   val replicationFactor = 2
   val overridingProps = new Properties()
+  var admin: Admin = _
   overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, 
numPartitions.toString)
   overridingProps.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, 
replicationFactor.toString)
 
   def generateConfigs =
     (0 until numServers) map { node =>
-      TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
+      TestUtils.createBrokerConfig(node, null, enableControlledShutdown = 
false, rack = Some((node / 2).toString))
     } map (KafkaConfig.fromProps(_, overridingProps))
 
   private val topic = "topic"
 
-  @Test
-  def testAutoCreateTopic(): Unit = {
+  @BeforeEach
+  override def setUp(testInfo: TestInfo): Unit = {
+    super.setUp(testInfo)
+    admin = TestUtils.createAdminClient(brokers, 
ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT))
+  }
+
+  @AfterEach
+  override def tearDown(): Unit = {
+    if (admin != null) admin.close()
+    super.tearDown()
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAutoCreateTopic(quorum: String, groupProtocol: String): Unit = {
     val producer = TestUtils.createProducer(bootstrapServers())
     try {
       // Send a message to auto-create the topic
@@ -52,15 +72,20 @@ class RackAwareAutoTopicCreationTest extends 
KafkaServerTestHarness with RackAwa
       assertEquals(0L, producer.send(record).get.offset, "Should have offset 
0")
 
       // double check that the topic is created with leader elected
-      TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0)
-      val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map 
{ case (topicPartition, replicas) =>
-        topicPartition.partition -> replicas
-      }
-      val brokerMetadatas = 
adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced)
+      TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0)
+      val assignment = getReplicaAssignment(topic)
+      val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers()
       val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1")
       assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> 
b.rack.get).toMap)
-      checkReplicaDistribution(assignment, expectedMap, numServers, 
numPartitions, replicationFactor)
+      checkReplicaDistribution(assignment, expectedMap, numServers, 
numPartitions, replicationFactor,
+        verifyLeaderDistribution = false)
     } finally producer.close()
   }
+
+  private def getReplicaAssignment(topic: String): Map[Int, Seq[Int]] = {
+    TestUtils.describeTopic(admin, topic).partitions.asScala.map { partition =>
+        partition.partition -> partition.replicas.asScala.map(_.id).toSeq
+    }.toMap
+  }
 }
 

Reply via email to