This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 401b347a775 MINOR: convert ProduceRequestTest to KRaft (#17780)
401b347a775 is described below

commit 401b347a77555e01c42be7450bfaaf6ff784f8fc
Author: Colin Patrick McCabe <cmcc...@apache.org>
AuthorDate: Thu Nov 14 11:13:27 2024 -0800

    MINOR: convert ProduceRequestTest to KRaft (#17780)
    
    Reviewers: Justine Olshan <justineols...@gmail.com>
---
 .../unit/kafka/server/ProduceRequestTest.scala     | 42 ++++++++++++++++++++--
 1 file changed, 39 insertions(+), 3 deletions(-)

diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index cb781589d35..4b952674429 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -20,6 +20,7 @@ package kafka.server
 import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
 import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, TopicDescription}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.compress.Compression
 import org.apache.kafka.common.config.TopicConfig
@@ -35,6 +36,7 @@ import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 import org.junit.jupiter.params.provider.ValueSource
 
+import java.util.concurrent.TimeUnit
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
 
@@ -84,14 +86,41 @@ class ProduceRequestTest extends BaseRequestTest {
       new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, 
"value2".getBytes)), 1)
   }
 
-  @ParameterizedTest
+  private def getPartitionToLeader(
+    admin: Admin,
+    topic: String
+  ): Map[Int, Int] = {
+    var topicDescription: TopicDescription = null
+    TestUtils.waitUntilTrue(() => {
+      val topicMap = admin.
+        describeTopics(java.util.Arrays.asList(topic)).
+          allTopicNames().get(10, TimeUnit.MINUTES)
+      topicDescription = topicMap.get(topic)
+      topicDescription != null
+    }, "Timed out waiting to describe topic " + topic)
+    topicDescription.partitions().asScala.map(p => {
+      p.partition() -> p.leader().id()
+    }).toMap
+  }
+
+  @ParameterizedTest(name = "quorum=kraft")
   @MethodSource(Array("timestampConfigProvider"))
   def testProduceWithInvalidTimestamp(messageTimeStampConfig: String, 
recordTimestamp: Long): Unit = {
     val topic = "topic"
     val partition = 0
     val topicConfig = new Properties
     topicConfig.setProperty(messageTimeStampConfig, "1000")
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, 
servers, topicConfig)
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin = admin,
+      topic = topic,
+      brokers = brokers,
+      controllers = controllerServers,
+      numPartitions = 1,
+      replicationFactor = 1,
+      topicConfig = topicConfig
+    )
+    val partitionToLeader = getPartitionToLeader(admin, topic)
     val leader = partitionToLeader(partition)
 
     def createRecords(magicValue: Byte, timestamp: Long, codec: Compression): 
MemoryRecords = {
@@ -138,7 +167,14 @@ class ProduceRequestTest extends BaseRequestTest {
     val partition = 0
 
     // Create a single-partition topic and find a broker which is not the 
leader
-    val partitionToLeader = createTopic(topic)
+    val admin = createAdminClient()
+    TestUtils.createTopicWithAdmin(
+      admin = admin,
+      topic = topic,
+      brokers = brokers,
+      controllers = controllerServers
+    )
+    val partitionToLeader = getPartitionToLeader(admin, topic)
     val leader = partitionToLeader(partition)
     val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
     assertTrue(nonReplicaOpt.isDefined)

Reply via email to