This is an automated email from the ASF dual-hosted git repository.
dengziming pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf51a50a564 MINOR: KRaft support for Integration Tests (#14295)
bf51a50a564 is described below
commit bf51a50a564ee43d3515c82fc706f17325c4602f
Author: mannoopj <[email protected]>
AuthorDate: Mon Oct 9 04:07:22 2023 -0400
MINOR: KRaft support for Integration Tests (#14295)
Enable kraft mode for some producer/fetcher tests.
---
.../kafka/admin/ListOffsetsIntegrationTest.scala | 25 +++--
.../integration/kafka/api/LogAppendTimeTest.scala | 11 ++-
.../kafka/api/ProducerFailureHandlingTest.scala | 74 ++++++++------
.../MetricsDuringTopicCreationDeletionTest.scala | 14 +--
.../kafka/server/FetchRequestMaxBytesTest.scala | 11 ++-
.../scala/unit/kafka/server/FetchRequestTest.scala | 110 ++++++++++++---------
.../unit/kafka/server/OffsetFetchRequestTest.scala | 4 +
.../unit/kafka/server/ProduceRequestTest.scala | 32 +++---
8 files changed, 166 insertions(+), 115 deletions(-)
diff --git
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
index bcb9641e9e8..333bd980a60 100644
---
a/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
+++
b/core/src/test/scala/integration/kafka/admin/ListOffsetsIntegrationTest.scala
@@ -19,13 +19,15 @@ package kafka.admin
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.admin._
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.utils.Utils
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.collection.{Map, Seq}
import scala.jdk.CollectionConverters._
@@ -51,20 +53,23 @@ class ListOffsetsIntegrationTest extends
KafkaServerTestHarness {
super.tearDown()
}
- @Test
- def testEarliestOffset(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testEarliestOffset(quorum: String): Unit = {
val earliestOffset = runFetchOffsets(adminClient, OffsetSpec.earliest())
assertEquals(0, earliestOffset.offset())
}
- @Test
- def testLatestOffset(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLatestOffset(quorum: String): Unit = {
val latestOffset = runFetchOffsets(adminClient, OffsetSpec.latest())
assertEquals(3, latestOffset.offset())
}
- @Test
- def testMaxTimestampOffset(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testMaxTimestampOffset(quorum: String): Unit = {
val maxTimestampOffset = runFetchOffsets(adminClient,
OffsetSpec.maxTimestamp())
assertEquals(1, maxTimestampOffset.offset())
}
@@ -86,10 +91,10 @@ class ListOffsetsIntegrationTest extends
KafkaServerTestHarness {
new ProducerRecord[Array[Byte], Array[Byte]](topicName, 0, 200L,
null, new Array[Byte](10000)),
)
- TestUtils.produceMessages(servers, records, -1)
+ TestUtils.produceMessages(brokers, records, -1)
}
def generateConfigs: Seq[KafkaConfig] =
- TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
+ TestUtils.createBrokerConfigs(1,
zkConnectOrNull).map(KafkaConfig.fromProps)
}
diff --git a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
index 6f397d8a46d..ff63cb73b1b 100644
--- a/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
+++ b/core/src/test/scala/integration/kafka/api/LogAppendTimeTest.scala
@@ -19,11 +19,13 @@ package kafka.api
import java.util.Collections
import java.util.concurrent.TimeUnit
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.record.TimestampType
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import org.junit.jupiter.api.Assertions.{assertEquals, assertNotEquals,
assertTrue}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
/**
* Tests where the broker is configured to use LogAppendTime. For tests where
LogAppendTime is configured via topic
@@ -46,8 +48,9 @@ class LogAppendTimeTest extends IntegrationTestHarness {
createTopic(topic)
}
- @Test
- def testProduceConsume(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testProduceConsume(quorum: String): Unit = {
val producer = createProducer()
val now = System.currentTimeMillis()
val createTime = now - TimeUnit.DAYS.toMillis(1)
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
index f5c0ef8c93b..93d6a460acd 100644
---
a/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
+++
b/core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala
@@ -21,14 +21,16 @@ import java.util.concurrent.ExecutionException
import java.util.Properties
import kafka.integration.KafkaServerTestHarness
import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.errors._
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.record.{DefaultRecord, DefaultRecordBatch}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class ProducerFailureHandlingTest extends KafkaServerTestHarness {
private val producerBufferSize = 30000
@@ -48,7 +50,7 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
overridingProps.put(KafkaConfig.OffsetsTopicPartitionsProp, 1.toString)
def generateConfigs =
- TestUtils.createBrokerConfigs(numServers, zkConnect,
false).map(KafkaConfig.fromProps(_, overridingProps))
+ TestUtils.createBrokerConfigs(numServers, zkConnectOrNull,
false).map(KafkaConfig.fromProps(_, overridingProps))
private var producer1: KafkaProducer[Array[Byte], Array[Byte]] = _
private var producer2: KafkaProducer[Array[Byte], Array[Byte]] = _
@@ -83,8 +85,9 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
/**
* With ack == 0 the future metadata will have no exceptions with offset -1
*/
- @Test
- def testTooLargeRecordWithAckZero(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTooLargeRecordWithAckZero(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
@@ -100,8 +103,9 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
/**
* With ack == 1 the future metadata will throw ExecutionException caused by
RecordTooLargeException
*/
- @Test
- def testTooLargeRecordWithAckOne(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testTooLargeRecordWithAckOne(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
@@ -118,7 +122,7 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
// create topic
val topic10 = "topic10"
- createTopic(topic10, numPartitions = servers.size, replicationFactor =
numServers, topicConfig)
+ createTopic(topic10, numPartitions = brokers.size, replicationFactor =
numServers, topicConfig)
// send a record that is too large for replication, but within the broker
max message limit
val value = new Array[Byte](maxMessageSize -
DefaultRecordBatch.RECORD_BATCH_OVERHEAD - DefaultRecord.MAX_RECORD_OVERHEAD)
@@ -129,22 +133,25 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
}
/** This should succeed as the replica fetcher thread can handle oversized
messages since KIP-74 */
- @Test
- def testPartitionTooLargeForReplicationWithAckAll(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testPartitionTooLargeForReplicationWithAckAll(quorum: String): Unit = {
checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxPartitionBytes)
}
/** This should succeed as the replica fetcher thread can handle oversized
messages since KIP-74 */
- @Test
- def testResponseTooLargeForReplicationWithAckAll(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testResponseTooLargeForReplicationWithAckAll(quorum: String): Unit = {
checkTooLargeRecordForReplicationWithAckAll(replicaFetchMaxResponseBytes)
}
/**
* With non-exist-topic the future metadata should return ExecutionException
caused by TimeoutException
*/
- @Test
- def testNonExistentTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testNonExistentTopic(quorum: String): Unit = {
// send a record with non-exist topic
val record = new ProducerRecord(topic2, null, "key".getBytes,
"value".getBytes)
assertThrows(classOf[ExecutionException], () => producer1.send(record).get)
@@ -160,8 +167,9 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
* CorruptRecordException
* TimeoutException
*/
- @Test
- def testWrongBrokerList(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testWrongBrokerList(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
@@ -177,8 +185,9 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
* Send with invalid partition id should return ExecutionException caused
by TimeoutException
* when partition is higher than the upper bound of partitions.
*/
- @Test
- def testInvalidPartition(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testInvalidPartition(quorum: String): Unit = {
// create topic with a single partition
createTopic(topic1, numPartitions = 1, replicationFactor = numServers)
@@ -191,8 +200,9 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
/**
* The send call after producer closed should throw IllegalStateException
*/
- @Test
- def testSendAfterClosed(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSendAfterClosed(quorum: String): Unit = {
// create topic
createTopic(topic1, replicationFactor = numServers)
@@ -211,16 +221,19 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
assertThrows(classOf[IllegalStateException], () => producer3.send(record))
}
- @Test
- def testCannotSendToInternalTopic(): Unit = {
- TestUtils.createOffsetsTopic(zkClient, servers)
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCannotSendToInternalTopic(quorum: String): Unit = {
+
+ createOffsetsTopic()
val thrown = assertThrows(classOf[ExecutionException],
() => producer2.send(new ProducerRecord(Topic.GROUP_METADATA_TOPIC_NAME,
"test".getBytes, "test".getBytes)).get)
assertTrue(thrown.getCause.isInstanceOf[InvalidTopicException],
"Unexpected exception while sending to an invalid topic " + thrown.getCause)
}
- @Test
- def testNotEnoughReplicas(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testNotEnoughReplicas(quorum: String): Unit = {
val topicName = "minisrtest"
val topicProps = new Properties()
topicProps.put("min.insync.replicas",(numServers+1).toString)
@@ -232,8 +245,9 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
assertEquals(classOf[NotEnoughReplicasException], e.getCause.getClass)
}
- @Test
- def testNotEnoughReplicasAfterBrokerShutdown(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testNotEnoughReplicasAfterBrokerShutdown(quorum: String): Unit = {
val topicName = "minisrtest2"
val topicProps = new Properties()
topicProps.put("min.insync.replicas", numServers.toString)
@@ -245,15 +259,15 @@ class ProducerFailureHandlingTest extends
KafkaServerTestHarness {
producer3.send(record).get
// shut down one broker
- servers.head.shutdown()
- servers.head.awaitShutdown()
+ brokers.head.shutdown()
+ brokers.head.awaitShutdown()
val e = assertThrows(classOf[ExecutionException], () =>
producer3.send(record).get)
assertTrue(e.getCause.isInstanceOf[NotEnoughReplicasException] ||
e.getCause.isInstanceOf[NotEnoughReplicasAfterAppendException] ||
e.getCause.isInstanceOf[TimeoutException])
// restart the server
- servers.head.startup()
+ brokers.head.startup()
}
}
diff --git
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
index 7d363d13b3c..3f0abf68a90 100644
---
a/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
+++
b/core/src/test/scala/unit/kafka/integration/MetricsDuringTopicCreationDeletionTest.scala
@@ -18,14 +18,15 @@
package kafka.integration
import java.util.Properties
-
import kafka.server.KafkaConfig
-import kafka.utils.{Logging, TestUtils}
+import kafka.utils.{Logging, TestInfoUtils, TestUtils}
import scala.jdk.CollectionConverters._
-import org.junit.jupiter.api.{BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{BeforeEach, TestInfo}
import com.yammer.metrics.core.Gauge
import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class MetricsDuringTopicCreationDeletionTest extends KafkaServerTestHarness
with Logging {
@@ -48,7 +49,7 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
@volatile private var running = true
- override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum,
zkConnect)
+ override def generateConfigs = TestUtils.createBrokerConfigs(nodesNum,
zkConnectOrNull)
.map(KafkaConfig.fromProps(_, overridingProps))
@BeforeEach
@@ -67,8 +68,9 @@ class MetricsDuringTopicCreationDeletionTest extends
KafkaServerTestHarness with
/*
* checking all metrics we care in a single test is faster though it would
be more elegant to have 3 @Test methods
*/
- @Test
- def testMetricsDuringTopicCreateDelete(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testMetricsDuringTopicCreateDelete(quorum: String): Unit = {
// For UnderReplicatedPartitions, because of
https://issues.apache.org/jira/browse/KAFKA-4605
// we can't access the metrics value of each server. So instead we
directly invoke the method
diff --git
a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
index f5425894bc7..581e6e7dbeb 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestMaxBytesTest.scala
@@ -17,14 +17,16 @@
package kafka.server
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{TopicPartition, Uuid}
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.{AfterEach, BeforeEach, Test, TestInfo}
+import org.junit.jupiter.api.{AfterEach, BeforeEach, TestInfo}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.util.{Optional, Properties}
import scala.jdk.CollectionConverters._
@@ -101,8 +103,9 @@ class FetchRequestMaxBytesTest extends BaseRequestTest {
* Note that when a single batch is larger than FetchMaxBytes, it will be
* returned in full even if this is larger than FetchMaxBytes. See KIP-74.
*/
- @Test
- def testConsumeMultipleRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testConsumeMultipleRecords(quorum: String): Unit = {
createTopics()
expectNextRecords(IndexedSeq(messages(0), messages(1)), 0)
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 0034e429b92..f54c082ef48 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.server
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
@@ -26,7 +26,8 @@ import
org.apache.kafka.common.serialization.{ByteArraySerializer, StringSeriali
import org.apache.kafka.common.{IsolationLevel, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import java.io.DataInputStream
import java.util
@@ -41,8 +42,9 @@ import scala.util.Random
*/
class FetchRequestTest extends BaseFetchRequestTest {
- @Test
- def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testBrokerRespectsPartitionsOrderAndSizeLimits(quorum: String): Unit = {
initProducer()
val messagesPerPartition = 9
@@ -60,7 +62,7 @@ class FetchRequestTest extends BaseFetchRequestTest {
val topicNames = topicIds.asScala.map(_.swap).asJava
produceData(topicPartitions, messagesPerPartition)
- val leaderId = servers.head.config.brokerId
+ val leaderId = brokers.head.config.brokerId
val partitionsForLeader = topicPartitionToLeader.toVector.collect {
case (tp, partitionLeaderId) if partitionLeaderId == leaderId => tp
}
@@ -143,8 +145,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
evaluateResponse4(fetchResponse4V12, 12)
}
- @Test
- def testFetchRequestV4WithReadCommitted(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchRequestV4WithReadCommitted(quorum: String): Unit = {
initProducer()
val maxPartitionBytes = 200
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1).head
@@ -161,18 +164,19 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertTrue(records(partitionData).map(_.sizeInBytes).sum > 0)
}
- @Test
- def testFetchRequestToNonReplica(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchRequestToNonReplica(quorum: String): Unit = {
val topic = "topic"
val partition = 0
val topicPartition = new TopicPartition(topic, partition)
// Create a single-partition topic and find a broker which is not the
leader
- val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, 1, servers)
+ val partitionToLeader = createTopic(topic)
val topicIds = getTopicIds().asJava
val topicNames = topicIds.asScala.map(_.swap).asJava
val leader = partitionToLeader(partition)
- val nonReplicaOpt = servers.find(_.config.brokerId != leader)
+ val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
assertTrue(nonReplicaOpt.isDefined)
val nonReplicaId = nonReplicaOpt.get.config.brokerId
@@ -191,22 +195,24 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.NOT_LEADER_OR_FOLLOWER.code,
oldPartitionData.errorCode)
}
- @Test
- def testLastFetchedEpochValidation(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLastFetchedEpochValidation(quorum: String): Unit = {
checkLastFetchedEpochValidation(ApiKeys.FETCH.latestVersion())
}
- @Test
- def testLastFetchedEpochValidationV12(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testLastFetchedEpochValidationV12(quorum: String): Unit = {
checkLastFetchedEpochValidation(12)
}
private def checkLastFetchedEpochValidation(version: Short): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, replicationFactor = 3, servers)
+ val partitionToLeader = createTopic(topic, replicationFactor = 3)
val firstLeaderId = partitionToLeader(topicPartition.partition)
- val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId,
topicPartition, servers)
+ val firstLeaderEpoch = TestUtils.findLeaderEpoch(firstLeaderId,
topicPartition, brokers)
initProducer()
@@ -216,8 +222,8 @@ class FetchRequestTest extends BaseFetchRequestTest {
// Force a leader change
killBroker(firstLeaderId)
// Write some more data in epoch 1
- val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition,
firstLeaderId)
- val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId,
topicPartition, servers)
+ val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition,
firstLeaderId)
+ val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId,
topicPartition, brokers)
val secondEpochResponses = produceData(Seq(topicPartition), 100)
val secondEpochEndOffset = secondEpochResponses.lastOption.get.offset + 1
val topicIds = getTopicIds().asJava
@@ -243,20 +249,22 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(firstEpochEndOffset, divergingEpoch.endOffset)
}
- @Test
- def testCurrentEpochValidation(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCurrentEpochValidation(quorum: String): Unit = {
checkCurrentEpochValidation(ApiKeys.FETCH.latestVersion())
}
- @Test
- def testCurrentEpochValidationV12(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCurrentEpochValidationV12(quorum: String): Unit = {
checkCurrentEpochValidation(12)
}
private def checkCurrentEpochValidation(version: Short): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, replicationFactor = 3, servers)
+ val partitionToLeader = createTopic(topic, replicationFactor = 3)
val firstLeaderId = partitionToLeader(topicPartition.partition)
def assertResponseErrorForEpoch(error: Errors, brokerId: Int, leaderEpoch:
Optional[Integer]): Unit = {
@@ -276,46 +284,48 @@ class FetchRequestTest extends BaseFetchRequestTest {
killBroker(firstLeaderId)
// Check leader error codes
- val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition,
firstLeaderId)
- val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId,
topicPartition, servers)
+ val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition,
firstLeaderId)
+ val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId,
topicPartition, brokers)
assertResponseErrorForEpoch(Errors.NONE, secondLeaderId, Optional.empty())
assertResponseErrorForEpoch(Errors.NONE, secondLeaderId,
Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, secondLeaderId,
Optional.of(secondLeaderEpoch - 1))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, secondLeaderId,
Optional.of(secondLeaderEpoch + 1))
// Check follower error codes
- val followerId = TestUtils.findFollowerId(topicPartition, servers)
+ val followerId = TestUtils.findFollowerId(topicPartition, brokers)
assertResponseErrorForEpoch(Errors.NONE, followerId, Optional.empty())
assertResponseErrorForEpoch(Errors.NONE, followerId,
Optional.of(secondLeaderEpoch))
assertResponseErrorForEpoch(Errors.UNKNOWN_LEADER_EPOCH, followerId,
Optional.of(secondLeaderEpoch + 1))
assertResponseErrorForEpoch(Errors.FENCED_LEADER_EPOCH, followerId,
Optional.of(secondLeaderEpoch - 1))
}
- @Test
- def testEpochValidationWithinFetchSession(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testEpochValidationWithinFetchSession(quorum: String): Unit = {
checkEpochValidationWithinFetchSession(ApiKeys.FETCH.latestVersion())
}
- @Test
- def testEpochValidationWithinFetchSessionV12(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testEpochValidationWithinFetchSessionV12(quorum: String): Unit = {
checkEpochValidationWithinFetchSession(12)
}
private def checkEpochValidationWithinFetchSession(version: Short): Unit = {
val topic = "topic"
val topicPartition = new TopicPartition(topic, 0)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, replicationFactor = 3, servers)
+ val partitionToLeader = createTopic(topic, replicationFactor = 3)
val firstLeaderId = partitionToLeader(topicPartition.partition)
// We need a leader change in order to check epoch fencing since the first
epoch is 0 and
// -1 is treated as having no epoch at all
killBroker(firstLeaderId)
- val secondLeaderId = TestUtils.awaitLeaderChange(servers, topicPartition,
firstLeaderId)
- val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId,
topicPartition, servers)
+ val secondLeaderId = TestUtils.awaitLeaderChange(brokers, topicPartition,
firstLeaderId)
+ val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId,
topicPartition, brokers)
verifyFetchSessionErrors(topicPartition, secondLeaderEpoch,
secondLeaderId, version)
- val followerId = TestUtils.findFollowerId(topicPartition, servers)
+ val followerId = TestUtils.findFollowerId(topicPartition, brokers)
verifyFetchSessionErrors(topicPartition, secondLeaderEpoch, followerId,
version)
}
@@ -357,8 +367,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
* in the server. The client closes its connection after reading partial
data when the
* channel is muted in the server. If buffers are not released this will
result in OOM.
*/
- @Test
- def testDownConversionWithConnectionFailure(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDownConversionWithConnectionFailure(quorum: String): Unit = {
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1).head
val topicIds = getTopicIds().asJava
val topicNames = topicIds.asScala.map(_.swap).asJava
@@ -424,8 +435,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
* record batch to multiple v0/v1 record batches with size 1. If the fetch
offset points to inside the record batch,
* some records have to be dropped during the conversion.
*/
- @Test
- def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testDownConversionFromBatchedToUnbatchedRespectsOffset(quorum: String):
Unit = {
// Increase linger so that we have control over the batches created
producer = TestUtils.createProducer(bootstrapServers(),
retries = 5,
@@ -505,8 +517,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
* those partitions are returned in all incremental fetch requests.
* This tests using FetchRequests that don't use topic IDs
*/
- @Test
- def testCreateIncrementalFetchWithPartitionsInErrorV12(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCreateIncrementalFetchWithPartitionsInErrorV12(quorum: String): Unit
= {
def createConsumerFetchRequest(topicPartitions: Seq[TopicPartition],
metadata: JFetchMetadata,
toForget: Seq[TopicIdPartition]): FetchRequest =
@@ -564,8 +577,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
/**
* Test that when a Fetch Request receives an unknown topic ID, it returns a
top level error.
*/
- @Test
- def testFetchWithPartitionsWithIdError(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testFetchWithPartitionsWithIdError(quorum: String): Unit = {
def createConsumerFetchRequest(fetchData:
util.LinkedHashMap[TopicPartition, FetchRequest.PartitionData],
metadata: JFetchMetadata,
toForget: Seq[TopicIdPartition]): FetchRequest = {
@@ -606,8 +620,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(Errors.UNKNOWN_TOPIC_ID.code,
responseData1.get(bar0).errorCode)
}
- @Test
- def testZStdCompressedTopic(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testZStdCompressedTopic(quorum: String): Unit = {
// ZSTD compressed topic
val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG ->
BrokerCompressionType.ZSTD.name)
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1, configs = topicConfig).head
@@ -653,8 +668,9 @@ class FetchRequestTest extends BaseFetchRequestTest {
assertEquals(3, records(data2).size)
}
- @Test
- def testZStdCompressedRecords(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testZStdCompressedRecords(quorum: String): Unit = {
// Producer compressed topic
val topicConfig = Map(TopicConfig.COMPRESSION_TYPE_CONFIG ->
BrokerCompressionType.PRODUCER.name)
val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions
= 1, configs = topicConfig).head
diff --git a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
index 200cdd6511a..606aa109d56 100644
--- a/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/OffsetFetchRequestTest.scala
@@ -16,16 +16,20 @@
*/
package kafka.server
+
import kafka.test.ClusterInstance
import kafka.test.annotation.{ClusterConfigProperty, ClusterTest,
ClusterTestDefaults, Type}
import kafka.test.junit.ClusterTestExtensions
+
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.message.OffsetFetchResponseData
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+
import org.junit.jupiter.api.Assertions.{assertEquals, fail}
import org.junit.jupiter.api.{Tag, Timeout}
import org.junit.jupiter.api.extension.ExtendWith
+
import scala.jdk.CollectionConverters._
@Timeout(120)
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index 6ad22df3221..85a11c20976 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -19,7 +19,7 @@ package kafka.server
import java.nio.ByteBuffer
import java.util.{Collections, Properties}
-import kafka.utils.TestUtils
+import kafka.utils.{TestInfoUtils, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.message.ProduceRequestData
@@ -29,9 +29,9 @@ import org.apache.kafka.common.requests.{ProduceRequest,
ProduceResponse}
import org.apache.kafka.server.metrics.KafkaYammerMetrics
import org.apache.kafka.server.record.BrokerCompressionType
import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -44,8 +44,9 @@ class ProduceRequestTest extends BaseRequestTest {
val metricsKeySet =
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
- @Test
- def testSimpleProduceRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testSimpleProduceRequest(quorum: String): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
def sendAndCheck(memoryRecords: MemoryRecords, expectedOffset: Long): Unit
= {
@@ -128,15 +129,16 @@ class ProduceRequestTest extends BaseRequestTest {
assertEquals("One or more records have been rejected due to invalid
timestamp", partitionProduceResponse.errorMessage)
}
- @Test
- def testProduceToNonReplica(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testProduceToNonReplica(quorum: String): Unit = {
val topic = "topic"
val partition = 0
// Create a single-partition topic and find a broker which is not the
leader
- val partitionToLeader = TestUtils.createTopic(zkClient, topic,
numPartitions = 1, 1, servers)
+ val partitionToLeader = createTopic(topic)
val leader = partitionToLeader(partition)
- val nonReplicaOpt = servers.find(_.config.brokerId != leader)
+ val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
assertTrue(nonReplicaOpt.isDefined)
val nonReplicaId = nonReplicaOpt.get.config.brokerId
@@ -164,14 +166,15 @@ class ProduceRequestTest extends BaseRequestTest {
/* returns a pair of partition id and leader id */
private def createTopicAndFindPartitionWithLeader(topic: String): (Int, Int)
= {
- val partitionToLeader = TestUtils.createTopic(zkClient, topic, 3, 2,
servers)
+ val partitionToLeader = createTopic(topic, 3, 2)
partitionToLeader.collectFirst {
case (partition, leader) if leader != -1 => (partition, leader)
}.getOrElse(throw new AssertionError(s"No leader elected for topic
$topic"))
}
- @Test
- def testCorruptLz4ProduceRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testCorruptLz4ProduceRequest(quorum: String): Unit = {
val (partition, leader) = createTopicAndFindPartitionWithLeader("topic")
val timestamp = 1000000
val memoryRecords = MemoryRecords.withRecords(CompressionType.LZ4,
@@ -204,15 +207,16 @@ class ProduceRequestTest extends BaseRequestTest {
assertTrue(TestUtils.meterCount(s"${BrokerTopicStats.InvalidMessageCrcRecordsPerSec}")
> 0)
}
- @Test
- def testZSTDProduceRequest(): Unit = {
+ @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedQuorumName)
+ @ValueSource(strings = Array("zk", "kraft"))
+ def testZSTDProduceRequest(quorum: String): Unit = {
val topic = "topic"
val partition = 0
// Create a single-partition topic compressed with ZSTD
val topicConfig = new Properties
topicConfig.setProperty(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.ZSTD.name)
- val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1,
servers, topicConfig)
+ val partitionToLeader = createTopic(topic, topicConfig = topicConfig)
val leader = partitionToLeader(partition)
val memoryRecords = MemoryRecords.withRecords(CompressionType.ZSTD,
new SimpleRecord(System.currentTimeMillis(), "key".getBytes,
"value".getBytes))