This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 9e01a921550 KAFKA-15722: Add KRaft support in RackAwareAutoTopicCreationTest 9e01a921550 is described below commit 9e01a9215506f82e3ce9386816123005e6513fe8 Author: TapDang <89607407+phong260...@users.noreply.github.com> AuthorDate: Mon Nov 11 22:18:12 2024 -0500 KAFKA-15722: Add KRaft support in RackAwareAutoTopicCreationTest Reviewers: Colin P. McCabe <cmcc...@apache.org> --- .../kafka/api/RackAwareAutoTopicCreationTest.scala | 51 ++++++++++++++++------ 1 file changed, 38 insertions(+), 13 deletions(-) diff --git a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala index a8d2431f80b..9bd23de7137 100644 --- a/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala +++ b/core/src/test/scala/integration/kafka/api/RackAwareAutoTopicCreationTest.scala @@ -17,34 +17,54 @@ package kafka.api import java.util.Properties - -import kafka.admin.{RackAwareMode, RackAwareTest} +import kafka.admin.RackAwareTest import kafka.integration.KafkaServerTestHarness import kafka.server.KafkaConfig -import kafka.utils.TestUtils +import kafka.utils.{TestInfoUtils, TestUtils} +import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs} +import org.apache.kafka.common.network.ListenerName +import org.apache.kafka.common.security.auth.SecurityProtocol import org.junit.jupiter.api.Assertions._ -import org.junit.jupiter.api.Test +import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo} +import org.junit.jupiter.params.ParameterizedTest +import org.junit.jupiter.params.provider.MethodSource + import scala.collection.Map +import scala.jdk.CollectionConverters.ListHasAsScala class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwareTest { val numServers = 4 val numPartitions = 8 val replicationFactor = 2 val overridingProps = new Properties() + var admin: Admin = _ overridingProps.put(ServerLogConfigs.NUM_PARTITIONS_CONFIG, numPartitions.toString) overridingProps.put(ReplicationConfigs.DEFAULT_REPLICATION_FACTOR_CONFIG, replicationFactor.toString) def generateConfigs = (0 until numServers) map { node => - TestUtils.createBrokerConfig(node, zkConnect, enableControlledShutdown = false, rack = Some((node / 2).toString)) + TestUtils.createBrokerConfig(node, null, enableControlledShutdown = false, rack = Some((node / 2).toString)) } map (KafkaConfig.fromProps(_, overridingProps)) private val topic = "topic" - @Test - def testAutoCreateTopic(): Unit = { + @BeforeEach + override def setUp(testInfo: TestInfo): Unit = { + super.setUp(testInfo) + admin = TestUtils.createAdminClient(brokers, ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)) + } + + @AfterEach + override def tearDown(): Unit = { + if (admin != null) admin.close() + super.tearDown() + } + + @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames) + @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll")) + def testAutoCreateTopic(quorum: String, groupProtocol: String): Unit = { val producer = TestUtils.createProducer(bootstrapServers()) try { // Send a message to auto-create the topic @@ -52,15 +72,20 @@ class RackAwareAutoTopicCreationTest extends KafkaServerTestHarness with RackAwa assertEquals(0L, producer.send(record).get.offset, "Should have offset 0") // double check that the topic is created with leader elected - TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic, 0) - val assignment = zkClient.getReplicaAssignmentForTopics(Set(topic)).map { case (topicPartition, replicas) => - topicPartition.partition -> replicas - } - val brokerMetadatas = adminZkClient.getBrokerMetadatas(RackAwareMode.Enforced) + TestUtils.waitUntilLeaderIsElectedOrChangedWithAdmin(admin, topic, 0) + val assignment = getReplicaAssignment(topic) + val brokerMetadatas = brokers.head.metadataCache.getAliveBrokers() val expectedMap = Map(0 -> "0", 1 -> "0", 2 -> "1", 3 -> "1") assertEquals(expectedMap, brokerMetadatas.map(b => b.id -> b.rack.get).toMap) - checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor) + checkReplicaDistribution(assignment, expectedMap, numServers, numPartitions, replicationFactor, + verifyLeaderDistribution = false) } finally producer.close() } + + private def getReplicaAssignment(topic: String): Map[Int, Seq[Int]] = { + TestUtils.describeTopic(admin, topic).partitions.asScala.map { partition => + partition.partition -> partition.replicas.asScala.map(_.id).toSeq + }.toMap + } }