This is an automated email from the ASF dual-hosted git repository.

dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf51a50a564 MINOR: KRaft support for Integration Tests (#14295)
bf51a50a564 is described below

commit bf51a50a564ee43d3515c82fc706f17325c4602f
Author: mannoopj <[email protected]>
AuthorDate: Mon Oct 9 04:07:22 2023 -0400

    MINOR: KRaft support for Integration Tests (#14295)
    
    Enable kraft mode for some producer/fetcher tests.
---
 .../kafka/admin/ListOffsetsIntegrationTest.scala   |  25 +++--
 .../integration/kafka/api/LogAppendTimeTest.scala  |  11 ++-
 .../kafka/api/ProducerFailureHandlingTest.scala    |  74 ++++++++------
 .../MetricsDuringTopicCreationDeletionTest.scala   |  14 +--
 .../kafka/server/FetchRequestMaxBytesTest.scala    |  11 ++-
 .../scala/unit/kafka/server/FetchRequestTest.scala | 110 ++++++++++++---------
 .../unit/kafka/server/OffsetFetchRequestTest.scala |   4 +
 .../unit/kafka/server/ProduceRequestTest.scala     |  32 +++---
 8 files changed, 166 insertions(+), 115 deletions(-)

diff --git 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index bcb9641e9e8..333bd980a60 100644
--- 
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++ 
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -19,13 +19,15 @@ package kafka.admin
 
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.Utils
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
@@ -51,20 +53,23 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
     super.tearDown()
   }
 
-  @Test
-  def testEarliestOffset(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testEarliestOffset(quorum: String): Unit = {
     val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
     assertEquals(0, earliestOffset.offset())
   }
 
-  @Test
-  def testLatestOffset(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLatestOffset(quorum: String): Unit = {
     val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
     assertEquals(3, latestOffset.offset())
   }
 
-  @Test
-  def testMaxTimestampOffset(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMaxTimestampOffset(quorum: String): Unit = {
     val maxTimestampOffset = runFetchOffsets(adminClient, 
OffsetSpec.maxTimestamp())
     assertEquals(1, maxTimestampOffset.offset())
   }
@@ -86,10 +91,10 @@ class ListOffsetsIntegrationTest extends 
KafkaServerTestHarness {
       new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
         null, new Array[Byte](10000)),
     )
-    TestUtils.produceMessages(servers, records, -1)
+    TestUtils.produceMessages(brokers, records, -1)
   }
 
   def generateConfigs: Seq[KafkaConfig] =
-    TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+    TestUtils.createBrokerConfigs(1, 
zkConnectOrNull).map(KafkaConfig.fromProps)
 }
 
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala 
b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 6f397d8a46d..ff63cb73b1b 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -19,11 +19,13 @@ package kafka.api
 import java.util.Collections
 import java.util.concurrent.TimeUnit
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.record.TimestampType
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals, 
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 /**
   * Tests where the broker is configured to use LogAppendTime. For tests where 
LogAppendTime is configured via topic
@@ -46,8 +48,9 @@ class LogAppendTimeTest extends IntegrationTestHarness {
     createTopic(topic)
   }
 
-  @Test
-  def testProduceConsume(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceConsume(quorum: String): Unit = {
     val producer = createProducer()
     val now = System.currentTimeMillis()
     val createTime = now - TimeUnit.DAYS.toMillis(1)
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index f5c0ef8c93b..93d6a460acd 100644
--- 
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++ 
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -21,14 +21,16 @@ import java.util.concurrent.ExecutionException
 import java.util.Properties
 import kafka.integration.KafkaServerTestHarness
 import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer._
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.errors._
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class ProducerFailureHandlingTest extends KafkaServerTestHarness {
   private val producerBufferSize = 30000
@@ -48,7 +50,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
 
   def generateConfigs =
-    TestUtils.createBrokerConfigs(numServers, zkConnect, 
false).map(KafkaConfig.fromProps(_, overridingProps))
+    TestUtils.createBrokerConfigs(numServers, zkConnectOrNull, 
false).map(KafkaConfig.fromProps(_, overridingProps))
 
   private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
   private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
@@ -83,8 +85,9 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   /**
    * With ack == 0 the future metadata will have no exceptions with offset -1
    */
-  @Test
-  def testTooLargeRecordWithAckZero(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTooLargeRecordWithAckZero(quorum: String): Unit = {
     // create topic
     createTopic(topic1, replicationFactor = numServers)
 
@@ -100,8 +103,9 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   /**
    * With ack == 1 the future metadata will throw ExecutionException caused by 
RecordTooLargeException
    */
-  @Test
-  def testTooLargeRecordWithAckOne(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testTooLargeRecordWithAckOne(quorum: String): Unit = {
     // create topic
     createTopic(topic1, replicationFactor = numServers)
 
@@ -118,7 +122,7 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
 
     // create topic
     val topic10 = "topic10"
-    createTopic(topic10, numPartitions = servers.size, replicationFactor = 
numServers, topicConfig)
+    createTopic(topic10, numPartitions = brokers.size, replicationFactor = 
numServers, topicConfig)
 
     // send a record that is too large for replication, but within the broker 
max message limit
     val value = new Array[Byte](maxMessageSize - 
DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
@@ -129,22 +133,25 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   }
 
   /** This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74 */
-  @Test
-  def testPartitionTooLargeForReplicationWithAckAll(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testPartitionTooLargeForReplicationWithAckAll(quorum: String): Unit = {
     checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxPartitionBytes)
   }
 
   /** This should succeed as the replica fetcher thread can handle oversized 
messages since KIP-74 */
-  @Test
-  def testResponseTooLargeForReplicationWithAckAll(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testResponseTooLargeForReplicationWithAckAll(quorum: String): Unit = {
     checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxResponseBytes)
   }
 
   /**
    * With non-exist-topic the future metadata should return ExecutionException 
caused by TimeoutException
    */
-  @Test
-  def testNonExistentTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testNonExistentTopic(quorum: String): Unit = {
     // send a record with non-exist topic
     val record = new ProducerRecord(topic2, null, "key".getBytes, 
"value".getBytes)
     assertThrows(classOf[ExecutionException], () => producer1.send(record).get)
@@ -160,8 +167,9 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
    *    CorruptRecordException
    *    TimeoutException
    */
-  @Test
-  def testWrongBrokerList(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testWrongBrokerList(quorum: String): Unit = {
     // create topic
     createTopic(topic1, replicationFactor = numServers)
 
@@ -177,8 +185,9 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     * Send with invalid partition id should return ExecutionException caused 
by TimeoutException
     * when partition is higher than the upper bound of partitions.
     */
-  @Test
-  def testInvalidPartition(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testInvalidPartition(quorum: String): Unit = {
     // create topic with a single partition
     createTopic(topic1, numPartitions = 1, replicationFactor = numServers)
 
@@ -191,8 +200,9 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
   /**
    * The send call after producer closed should throw IllegalStateException
    */
-  @Test
-  def testSendAfterClosed(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSendAfterClosed(quorum: String): Unit = {
     // create topic
     createTopic(topic1, replicationFactor = numServers)
 
@@ -211,16 +221,19 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     assertThrows(classOf[IllegalStateException], () =>  producer3.send(record))
   }
 
-  @Test
-  def testCannotSendToInternalTopic(): Unit = {
-    TestUtils.createOffsetsTopic(zkClient, servers)
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCannotSendToInternalTopic(quorum: String): Unit = {
+
+    createOffsetsTopic()
     val thrown = assertThrows(classOf[ExecutionException],
       () => producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME, 
"test".getBytes, "test".getBytes)).get)
     assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException], 
"Unexpected exception while sending to an invalid topic " + thrown.getCause)
   }
 
-  @Test
-  def testNotEnoughReplicas(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testNotEnoughReplicas(quorum: String): Unit = {
     val topicName = "minisrtest"
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas",(numServers+1).toString)
@@ -232,8 +245,9 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     assertEquals(classOf[NotEnoughReplicasException], e.getCause.getClass)
   }
 
-  @Test
-  def testNotEnoughReplicasAfterBrokerShutdown(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testNotEnoughReplicasAfterBrokerShutdown(quorum: String): Unit = {
     val topicName = "minisrtest2"
     val topicProps = new Properties()
     topicProps.put("min.insync.replicas", numServers.toString)
@@ -245,15 +259,15 @@ class ProducerFailureHandlingTest extends 
KafkaServerTestHarness {
     producer3.send(record).get
 
     // shut down one broker
-    servers.head.shutdown()
-    servers.head.awaitShutdown()
+    brokers.head.shutdown()
+    brokers.head.awaitShutdown()
     val e = assertThrows(classOf[ExecutionException], () => 
producer3.send(record).get)
     assertTrue(e.getCause.isInstanceOf[NotEnoughReplicasException] ||
       e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] ||
       e.getCause.isInstanceOf[TimeoutException])
 
     // restart the server
-    servers.head.startup()
+    brokers.head.startup()
   }
 
 }
diff --git 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 7d363d13b3c..3f0abf68a90 100644
--- 
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++ 
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -18,14 +18,15 @@
 package kafka.integration
 
 import java.util.Properties
-
 import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
 
 import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
 import com.yammer.metrics.core.Gauge
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness 
with Logging {
 
@@ -48,7 +49,7 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
 
   @volatile private var running = true
 
-  override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, 
zkConnect)
+  override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum, 
zkConnectOrNull)
     .map(KafkaConfig.fromProps(_, overridingProps))
 
   @BeforeEach
@@ -67,8 +68,9 @@ class MetricsDuringTopicCreationDeletionTest extends 
KafkaServerTestHarness with
   /*
    * checking all metrics we care in a single test is faster though it would 
be more elegant to have 3 @Test methods
    */
-  @Test
-  def testMetricsDuringTopicCreateDelete(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testMetricsDuringTopicCreateDelete(quorum: String): Unit = {
 
     // For UnderReplicatedPartitions, because of 
https://issues.apache.org/jira/browse/KAFKA-4605
     // we can't access the metrics value of each server. So instead we 
directly invoke the method
diff --git 
a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
index f5425894bc7..581e6e7dbeb 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
@@ -17,14 +17,16 @@
 
 package kafka.server
 
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import java.util.{Optional, Properties}
 import scala.jdk.CollectionConverters._
@@ -101,8 +103,9 @@ class FetchRequestMaxBytesTest extends BaseRequestTest {
    * Note that when a single batch is larger than FetchMaxBytes, it will be
    * returned in full even if this is larger than FetchMaxBytes.  See KIP-74.
    */
-  @Test
-  def testConsumeMultipleRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testConsumeMultipleRecords(quorum: String): Unit = {
     createTopics()
 
     expectNextRecords(IndexedSeq(messages(0), messages(1)), 0)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 0034e429b92..f54c082ef48 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -16,7 +16,7 @@
   */
 package kafka.server
 
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -26,7 +26,8 @@ import 
org.apache.kafka.common.serialization.{ByteArraySerializer, StringSeriali
 import org.apache.kafka.common.{IsolationLevel, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 import java.io.DataInputStream
 import java.util
@@ -41,8 +42,9 @@ import scala.util.Random
   */
 class FetchRequestTest extends BaseFetchRequestTest {
 
-  @Test
-  def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testBrokerRespectsPartitionsOrderAndSizeLimits(quorum: String): Unit = {
     initProducer()
 
     val messagesPerPartition = 9
@@ -60,7 +62,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
     val topicNames = topicIds.asScala.map(_.swap).asJava
     produceData(topicPartitions, messagesPerPartition)
 
-    val leaderId = servers.head.config.brokerId
+    val leaderId = brokers.head.config.brokerId
     val partitionsForLeader = topicPartitionToLeader.toVector.collect {
       case (tp, partitionLeaderId) if partitionLeaderId == leaderId => tp
     }
@@ -143,8 +145,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
     evaluateResponse4(fetchResponse4V12, 12)
   }
 
-  @Test
-  def testFetchRequestV4WithReadCommitted(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchRequestV4WithReadCommitted(quorum: String): Unit = {
     initProducer()
     val maxPartitionBytes = 200
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
@@ -161,18 +164,19 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
   }
 
-  @Test
-  def testFetchRequestToNonReplica(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchRequestToNonReplica(quorum: String): Unit = {
     val topic = "topic"
     val partition = 0
     val topicPartition = new TopicPartition(topic, partition)
 
     // Create a single-partition topic and find a broker which is not the 
leader
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, 1, servers)
+    val partitionToLeader = createTopic(topic)
     val topicIds = getTopicIds().asJava
     val topicNames = topicIds.asScala.map(_.swap).asJava
     val leader = partitionToLeader(partition)
-    val nonReplicaOpt = servers.find(_.config.brokerId != leader)
+    val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
     assertTrue(nonReplicaOpt.isDefined)
     val nonReplicaId =  nonReplicaOpt.get.config.brokerId
 
@@ -191,22 +195,24 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code, 
oldPartitionData.errorCode)
   }
 
-  @Test
-  def testLastFetchedEpochValidation(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLastFetchedEpochValidation(quorum: String): Unit = {
     checkLastFetchedEpochValidation(ApiKeys.FETCH.latestVersion())
   }
 
-  @Test
-  def testLastFetchedEpochValidationV12(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testLastFetchedEpochValidationV12(quorum: String): Unit = {
     checkLastFetchedEpochValidation(12)
   }
 
   private def checkLastFetchedEpochValidation(version: Short): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(topic, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(topicPartition.partition)
-    val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, 
topicPartition, servers)
+    val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId, 
topicPartition, brokers)
 
     initProducer()
 
@@ -216,8 +222,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
     // Force a leader change
     killBroker(firstLeaderId)
     // Write some more data in epoch 1
-    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, 
firstLeaderId)
-    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, servers)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     val secondEpochResponses = produceData(Seq(topicPartition), 100)
     val secondEpochEndOffset = secondEpochResponses.lastOption.get.offset + 1
     val topicIds = getTopicIds().asJava
@@ -243,20 +249,22 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(firstEpochEndOffset, divergingEpoch.endOffset)
   }
 
-  @Test
-  def testCurrentEpochValidation(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCurrentEpochValidation(quorum: String): Unit = {
     checkCurrentEpochValidation(ApiKeys.FETCH.latestVersion())
   }
 
-  @Test
-  def testCurrentEpochValidationV12(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCurrentEpochValidationV12(quorum: String): Unit = {
     checkCurrentEpochValidation(12)
   }
 
   private def checkCurrentEpochValidation(version: Short): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(topic, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(topicPartition.partition)
 
     def assertResponseErrorForEpoch(error: Errors, brokerId: Int, leaderEpoch: 
Optional[Integer]): Unit = {
@@ -276,46 +284,48 @@ class FetchRequestTest extends BaseFetchRequestTest {
     killBroker(firstLeaderId)
 
     // Check leader error codes
-    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, 
firstLeaderId)
-    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, servers)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, 
Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, secondLeaderId, 
Optional.of(secondLeaderEpoch - 1))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, secondLeaderId, 
Optional.of(secondLeaderEpoch + 1))
 
     // Check follower error codes
-    val followerId = TestUtils.findFollowerId(topicPartition, servers)
+    val followerId = TestUtils.findFollowerId(topicPartition, brokers)
     assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
     assertResponseErrorForEpoch(Errors.NONE, followerId, 
Optional.of(secondLeaderEpoch))
     assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId, 
Optional.of(secondLeaderEpoch + 1))
     assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId, 
Optional.of(secondLeaderEpoch - 1))
   }
 
-  @Test
-  def testEpochValidationWithinFetchSession(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testEpochValidationWithinFetchSession(quorum: String): Unit = {
     checkEpochValidationWithinFetchSession(ApiKeys.FETCH.latestVersion())
   }
 
-  @Test
-  def testEpochValidationWithinFetchSessionV12(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testEpochValidationWithinFetchSessionV12(quorum: String): Unit = {
     checkEpochValidationWithinFetchSession(12)
   }
 
   private def checkEpochValidationWithinFetchSession(version: Short): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(topic, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(topicPartition.partition)
 
     // We need a leader change in order to check epoch fencing since the first 
epoch is 0 and
     // -1 is treated as having no epoch at all
     killBroker(firstLeaderId)
 
-    val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition, 
firstLeaderId)
-    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, servers)
+    val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition, 
firstLeaderId)
+    val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, 
topicPartition, brokers)
     verifyFetchSessionErrors(topicPartition, secondLeaderEpoch, 
secondLeaderId, version)
 
-    val followerId = TestUtils.findFollowerId(topicPartition, servers)
+    val followerId = TestUtils.findFollowerId(topicPartition, brokers)
     verifyFetchSessionErrors(topicPartition, secondLeaderEpoch, followerId, 
version)
   }
 
@@ -357,8 +367,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
    * in the server. The client closes its connection after reading partial 
data when the
    * channel is muted in the server. If buffers are not released this will 
result in OOM.
    */
-  @Test
-  def testDownConversionWithConnectionFailure(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDownConversionWithConnectionFailure(quorum: String): Unit = {
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
     val topicIds = getTopicIds().asJava
     val topicNames = topicIds.asScala.map(_.swap).asJava
@@ -424,8 +435,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
     * record batch to multiple v0/v1 record batches with size 1. If the fetch 
offset points to inside the record batch,
     * some records have to be dropped during the conversion.
     */
-  @Test
-  def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testDownConversionFromBatchedToUnbatchedRespectsOffset(quorum: String): 
Unit = {
     // Increase linger so that we have control over the batches created
     producer = TestUtils.createProducer(bootstrapServers(),
       retries = 5,
@@ -505,8 +517,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
    * those partitions are returned in all incremental fetch requests.
    * This tests using FetchRequests that don't use topic IDs
    */
-  @Test
-  def testCreateIncrementalFetchWithPartitionsInErrorV12(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCreateIncrementalFetchWithPartitionsInErrorV12(quorum: String): Unit 
= {
     def createConsumerFetchRequest(topicPartitions: Seq[TopicPartition],
                            metadata: JFetchMetadata,
                            toForget: Seq[TopicIdPartition]): FetchRequest =
@@ -564,8 +577,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
   /**
    * Test that when a Fetch Request receives an unknown topic ID, it returns a 
top level error.
    */
-  @Test
-  def testFetchWithPartitionsWithIdError(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testFetchWithPartitionsWithIdError(quorum: String): Unit = {
     def createConsumerFetchRequest(fetchData: 
util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
                            metadata: JFetchMetadata,
                            toForget: Seq[TopicIdPartition]): FetchRequest = {
@@ -606,8 +620,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(Errors.UNKNOWN_TOPIC_ID.code, 
responseData1.get(bar0).errorCode)
   }
 
-  @Test
-  def testZStdCompressedTopic(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testZStdCompressedTopic(quorum: String): Unit = {
     // ZSTD compressed topic
     val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> 
BrokerCompressionType.ZSTD.name)
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1, configs = topicConfig).head
@@ -653,8 +668,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
     assertEquals(3, records(data2).size)
   }
 
-  @Test
-  def testZStdCompressedRecords(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testZStdCompressedRecords(quorum: String): Unit = {
     // Producer compressed topic
     val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> 
BrokerCompressionType.PRODUCER.name)
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1, configs = topicConfig).head
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 200cdd6511a..606aa109d56 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -16,16 +16,20 @@
  */
 package kafka.server
 
+
 import kafka.test.ClusterInstance
 import kafka.test.annotation.{ClusterConfigProperty, ClusterTest, 
ClusterTestDefaults, Type}
 import kafka.test.junit.ClusterTestExtensions
+
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.message.OffsetFetchResponseData
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+
 import org.junit.jupiter.api.Assertions.{assertEquals, fail}
 import org.junit.jupiter.api.{Tag, Timeout}
 import org.junit.jupiter.api.extension.ExtendWith
 
+
 import scala.jdk.CollectionConverters._
 
 @Timeout(120)
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 6ad22df3221..85a11c20976 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -19,7 +19,7 @@ package kafka.server
 
 import java.nio.ByteBuffer
 import java.util.{Collections, Properties}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.config.TopicConfig
 import org.apache.kafka.common.message.ProduceRequestData
@@ -29,9 +29,9 @@ import org.apache.kafka.common.requests.{ProduceRequest, 
ProduceResponse}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.record.BrokerCompressionType
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.ValueSource
 
 import scala.annotation.nowarn
 import scala.jdk.CollectionConverters._
@@ -44,8 +44,9 @@ class ProduceRequestTest extends BaseRequestTest {
 
   val metricsKeySet = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
 
-  @Test
-  def testSimpleProduceRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testSimpleProduceRequest(quorum: String): Unit = {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
 
     def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit 
= {
@@ -128,15 +129,16 @@ class ProduceRequestTest extends BaseRequestTest {
     assertEquals("One or more records have been rejected due to invalid 
timestamp", partitionProduceResponse.errorMessage)
   }
 
-  @Test
-  def testProduceToNonReplica(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testProduceToNonReplica(quorum: String): Unit = {
     val topic = "topic"
     val partition = 0
 
     // Create a single-partition topic and find a broker which is not the 
leader
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 
numPartitions = 1, 1, servers)
+    val partitionToLeader = createTopic(topic)
     val leader = partitionToLeader(partition)
-    val nonReplicaOpt = servers.find(_.config.brokerId != leader)
+    val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
     assertTrue(nonReplicaOpt.isDefined)
     val nonReplicaId =  nonReplicaOpt.get.config.brokerId
 
@@ -164,14 +166,15 @@ class ProduceRequestTest extends BaseRequestTest {
 
   /* returns a pair of partition id and leader id */
   private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int) 
= {
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 3, 2, 
servers)
+    val partitionToLeader = createTopic(topic, 3, 2)
     partitionToLeader.collectFirst {
       case (partition, leader) if leader != -1 => (partition, leader)
     }.getOrElse(throw new AssertionError(s"No leader elected for topic 
$topic"))
   }
 
-  @Test
-  def testCorruptLz4ProduceRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testCorruptLz4ProduceRequest(quorum: String): Unit = {
     val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
     val timestamp = 1000000
     val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
@@ -204,15 +207,16 @@ class ProduceRequestTest extends BaseRequestTest {
     
assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")
 > 0)
   }
 
-  @Test
-  def testZSTDProduceRequest(): Unit = {
+  @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+  @ValueSource(strings = Array("zk", "kraft"))
+  def testZSTDProduceRequest(quorum: String): Unit = {
     val topic = "topic"
     val partition = 0
 
     // Create a single-partition topic compressed with ZSTD
     val topicConfig = new Properties
     topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG, 
BrokerCompressionType.ZSTD.name)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, 
servers, topicConfig)
+    val partitionToLeader = createTopic(topic, topicConfig =  topicConfig)
     val leader = partitionToLeader(partition)
     val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD,
       new SimpleRecord(System.currentTimeMillis(), "key".getBytes, 
"value".getBytes))


Reply via email to