This is an automated email from the ASF dual-hosted git repository. cmccabe pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 401b347a775 MINOR: convert ProduceRequestTest to KRaft (#17780) 401b347a775 is described below commit 401b347a77555e01c42be7450bfaaf6ff784f8fc Author: Colin Patrick McCabe <cmcc...@apache.org> AuthorDate: Thu Nov 14 11:13:27 2024 -0800 MINOR: convert ProduceRequestTest to KRaft (#17780) Reviewers: Justine Olshan <justineols...@gmail.com> --- .../unit/kafka/server/ProduceRequestTest.scala | 42 ++++++++++++++++++++-- 1 file changed, 39 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index cb781589d35..4b952674429 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -20,6 +20,7 @@ package kafka.server import java.nio.ByteBuffer import java.util.{Collections, Properties} import kafka.utils.TestUtils +import org.apache.kafka.clients.admin.{Admin, TopicDescription} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig @@ -35,6 +36,7 @@ import org.junit.jupiter.params.ParameterizedTest import org.junit.jupiter.params.provider.{Arguments, MethodSource} import org.junit.jupiter.params.provider.ValueSource +import java.util.concurrent.TimeUnit import scala.annotation.nowarn import scala.jdk.CollectionConverters._ @@ -84,14 +86,41 @@ class ProduceRequestTest extends BaseRequestTest { new SimpleRecord(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } - @ParameterizedTest + private def getPartitionToLeader( + admin: Admin, + topic: String + ): Map[Int, Int] = { + var topicDescription: TopicDescription = null + TestUtils.waitUntilTrue(() => { + val topicMap = admin. + describeTopics(java.util.Arrays.asList(topic)). + allTopicNames().get(10, TimeUnit.MINUTES) + topicDescription = topicMap.get(topic) + topicDescription != null + }, "Timed out waiting to describe topic " + topic) + topicDescription.partitions().asScala.map(p => { + p.partition() -> p.leader().id() + }).toMap + } + + @ParameterizedTest(name = "quorum=kraft") @MethodSource(Array("timestampConfigProvider")) def testProduceWithInvalidTimestamp(messageTimeStampConfig: String, recordTimestamp: Long): Unit = { val topic = "topic" val partition = 0 val topicConfig = new Properties topicConfig.setProperty(messageTimeStampConfig, "1000") - val partitionToLeader = TestUtils.createTopic(zkClient, topic, 1, 1, servers, topicConfig) + val admin = createAdminClient() + TestUtils.createTopicWithAdmin( + admin = admin, + topic = topic, + brokers = brokers, + controllers = controllerServers, + numPartitions = 1, + replicationFactor = 1, + topicConfig = topicConfig + ) + val partitionToLeader = getPartitionToLeader(admin, topic) val leader = partitionToLeader(partition) def createRecords(magicValue: Byte, timestamp: Long, codec: Compression): MemoryRecords = { @@ -138,7 +167,14 @@ class ProduceRequestTest extends BaseRequestTest { val partition = 0 // Create a single-partition topic and find a broker which is not the leader - val partitionToLeader = createTopic(topic) + val admin = createAdminClient() + TestUtils.createTopicWithAdmin( + admin = admin, + topic = topic, + brokers = brokers, + controllers = controllerServers + ) + val partitionToLeader = getPartitionToLeader(admin, topic) val leader = partitionToLeader(partition) val nonReplicaOpt = brokers.find(_.config.brokerId != leader) assertTrue(nonReplicaOpt.isDefined)