This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 401b347a775 MINOR: convert ProduceRequestTest to KRaft (#17780)
401b347a775 is described below
commit 401b347a77555e01c42be7450bfaaf6ff784f8fc
Author: Colin Patrick McCabe <[email protected]>
AuthorDate: Thu Nov 14 11:13:27 2024 -0800
MINOR: convert ProduceRequestTest to KRaft (#17780)
Reviewers: Justine Olshan <[email protected]>
---
.../unit/kafka/server/ProduceRequestTest.scala | 42 ++++++++++++++++++++--
1 file changed, 39 insertions(+), 3 deletions(-)
diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
index cb781589d35..4b952674429 100644
--- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala
@@ -20,6 +20,7 @@ package kafka.server
import java.nio.ByteBuffer
import java.util.{Collections, Properties}
import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, TopicDescription}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@@ -35,6 +36,7 @@ import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{Arguments, MethodSource}
import org.junit.jupiter.params.provider.ValueSource
+import java.util.concurrent.TimeUnit
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -84,14 +86,41 @@ class ProduceRequestTest extends BaseRequestTest {
new SimpleRecord(System.currentTimeMillis(), "key2".getBytes,
"value2".getBytes)), 1)
}
- @ParameterizedTest
+ private def getPartitionToLeader(
+ admin: Admin,
+ topic: String
+ ): Map[Int, Int] = {
+ var topicDescription: TopicDescription = null
+ TestUtils.waitUntilTrue(() => {
+ val topicMap = admin.
+ describeTopics(java.util.Arrays.asList(topic)).
+ allTopicNames().get(10, TimeUnit.MINUTES)
+ topicDescription = topicMap.get(topic)
+ topicDescription != null
+ }, "Timed out waiting to describe topic " + topic)
+ topicDescription.partitions().asScala.map(p => {
+ p.partition() -> p.leader().id()
+ }).toMap
+ }
+
+ @ParameterizedTest(name = "quorum=kraft")
@MethodSource(Array("timestampConfigProvider"))
def testProduceWithInvalidTimestamp(messageTimeStampConfig: String,
recordTimestamp: Long): Unit = {
val topic = "topic"
val partition = 0
val topicConfig = new Properties
topicConfig.setProperty(messageTimeStampConfig, "1000")
- val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1,
servers, topicConfig)
+ val admin = createAdminClient()
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ topic = topic,
+ brokers = brokers,
+ controllers = controllerServers,
+ numPartitions = 1,
+ replicationFactor = 1,
+ topicConfig = topicConfig
+ )
+ val partitionToLeader = getPartitionToLeader(admin, topic)
val leader = partitionToLeader(partition)
def createRecords(magicValue: Byte, timestamp: Long, codec: Compression):
MemoryRecords = {
@@ -138,7 +167,14 @@ class ProduceRequestTest extends BaseRequestTest {
val partition = 0
// Create a single-partition topic and find a broker which is not the
leader
- val partitionToLeader = createTopic(topic)
+ val admin = createAdminClient()
+ TestUtils.createTopicWithAdmin(
+ admin = admin,
+ topic = topic,
+ brokers = brokers,
+ controllers = controllerServers
+ )
+ val partitionToLeader = getPartitionToLeader(admin, topic)
val leader = partitionToLeader(partition)
val nonReplicaOpt = brokers.find(_.config.brokerId != leader)
assertTrue(nonReplicaOpt.isDefined)