This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new 437ebb941e1 KAFKA-15729 Add KRaft support in GetOffsetShellTest (#15489) 437ebb941e1 is described below commit 437ebb941e1485a6f1ad7afbba3814bfa310c9cf Author: Owen Leung <owen.leu...@gmail.com> AuthorDate: Sun Apr 14 21:13:49 2024 +0800 KAFKA-15729 Add KRaft support in GetOffsetShellTest (#15489) Reviewers: Chia-Ping Tsai <chia7...@gmail.com> --- .../org/apache/kafka/tools/GetOffsetShellTest.java | 105 ++++++++++++++------- 1 file changed, 73 insertions(+), 32 deletions(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java index dad41342b7b..231222ec6a3 100644 --- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.tools; import kafka.test.ClusterInstance; +import kafka.test.annotation.ClusterConfigProperty; import kafka.test.annotation.ClusterTest; import kafka.test.annotation.ClusterTestDefaults; import kafka.test.annotation.Type; @@ -25,16 +26,19 @@ import kafka.test.junit.ClusterTestExtensions; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.admin.Admin; import org.apache.kafka.clients.admin.NewTopic; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.common.utils.Exit; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.extension.ExtendWith; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -46,33 +50,29 @@ import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; @ExtendWith(value = ClusterTestExtensions.class) -@ClusterTestDefaults(clusterType = Type.ZK) +@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = { + @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"), + @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = "1"), + @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4") +}) @Tag("integration") public class GetOffsetShellTest { private final int topicCount = 4; - private final int offsetTopicPartitionCount = 4; private final ClusterInstance cluster; - private final String topicName = "topic"; public GetOffsetShellTest(ClusterInstance cluster) { this.cluster = cluster; } private String getTopicName(int i) { - return topicName + i; - } - - @BeforeEach - public void before() { - cluster.config().serverProperties().put("auto.create.topics.enable", false); - cluster.config().serverProperties().put("offsets.topic.replication.factor", "1"); - cluster.config().serverProperties().put("offsets.topic.num.partitions", String.valueOf(offsetTopicPartitionCount)); + return "topic" + i; } private void setUp() { - try (Admin admin = Admin.create(cluster.config().adminClientProperties())) { + try (Admin admin = cluster.createAdminClient()) { List<NewTopic> topics = new ArrayList<>(); IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new NewTopic(getTopicName(i), i, (short) 1))); @@ -81,28 +81,53 @@ public class GetOffsetShellTest { } Properties props = new Properties(); - props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.config().producerProperties().get("bootstrap.servers")); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) { IntStream.range(0, topicCount + 1) .forEach(i -> IntStream.range(0, i * i) - .forEach(msgCount -> producer.send( - new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount))) + .forEach(msgCount -> { + assertDoesNotThrow(() -> producer.send( + new ProducerRecord<>(getTopicName(i), msgCount % i, null, "val" + msgCount)).get()); + }) ); } } + private void createConsumerAndPoll() { + Properties props = new Properties(); + props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers()); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group"); + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { + List<String> topics = new ArrayList<>(); + for (int i = 0; i < topicCount + 1; i++) { + topics.add(getTopicName(i)); + } + consumer.subscribe(topics); + consumer.poll(Duration.ofMillis(1000)); + } + } + static class Row { - private String name; - private int partition; - private Long timestamp; + private final String name; + private final int partition; + private final Long offset; - public Row(String name, int partition, Long timestamp) { + public Row(String name, int partition, Long offset) { this.name = name; this.partition = partition; - this.timestamp = timestamp; + this.offset = offset; + } + + @Override + public String toString() { + return "Row[name:" + name + ",partition:" + partition + ",offset:" + offset; } @Override @@ -113,12 +138,12 @@ public class GetOffsetShellTest { Row r = (Row) o; - return name.equals(r.name) && partition == r.partition && Objects.equals(timestamp, r.timestamp); + return name.equals(r.name) && partition == r.partition && Objects.equals(offset, r.offset); } @Override public int hashCode() { - return Objects.hash(name, partition, timestamp); + return Objects.hash(name, partition, offset); } } @@ -127,8 +152,11 @@ public class GetOffsetShellTest { setUp(); List<Row> output = executeAndParse(); - - assertEquals(expectedOffsetsWithInternal(), output); + if (!cluster.isKRaftTest()) { + assertEquals(expectedOffsetsWithInternal(), output); + } else { + assertEquals(expectedTestTopicOffsets(), output); + } } @ClusterTest @@ -148,7 +176,8 @@ public class GetOffsetShellTest { List<Row> offsets = executeAndParse("--topic", getTopicName(i)); assertEquals(expectedOffsetsForTopic(i), offsets, () -> "Offset output did not match for " + getTopicName(i)); - }); + } + ); } @ClusterTest @@ -165,8 +194,11 @@ public class GetOffsetShellTest { setUp(); List<Row> offsets = executeAndParse("--partitions", "0,1"); - - assertEquals(expectedOffsetsWithInternal().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets); + if (!cluster.isKRaftTest()) { + assertEquals(expectedOffsetsWithInternal().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets); + } else { + assertEquals(expectedTestTopicOffsets().stream().filter(r -> r.partition <= 1).collect(Collectors.toList()), offsets); + } } @ClusterTest @@ -182,6 +214,8 @@ public class GetOffsetShellTest { public void testTopicPartitionsArg() { setUp(); + createConsumerAndPoll(); + List<Row> offsets = executeAndParse("--topic-partitions", "topic1:0,topic2:1,topic(3|4):2,__.*:3"); List<Row> expected = Arrays.asList( new Row("__consumer_offsets", 3, 0L), @@ -236,7 +270,7 @@ public class GetOffsetShellTest { List<Row> offsets = executeAndParse("--topic-partitions", "topic.*", "--time", time); offsets.forEach( - row -> assertTrue(row.timestamp >= 0 && row.timestamp <= Integer.parseInt(row.name.replace("topic", ""))) + row -> assertTrue(row.offset >= 0 && row.offset <= Integer.parseInt(row.name.replace("topic", ""))) ); } } @@ -288,6 +322,8 @@ public class GetOffsetShellTest { public void testTopicPartitionsArgWithInternalIncluded() { setUp(); + createConsumerAndPoll(); + List<Row> offsets = executeAndParse("--topic-partitions", "__.*:0"); assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), offsets); @@ -320,8 +356,13 @@ public class GetOffsetShellTest { @ClusterTest public void testPrintHelp() { - String out = ToolsTestUtils.captureStandardErr(() -> GetOffsetShell.mainNoExit("--help")); - assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT)); + Exit.setExitProcedure((statusCode, message) -> { }); + try { + String out = ToolsTestUtils.captureStandardErr(() -> GetOffsetShell.mainNoExit("--help")); + assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT)); + } finally { + Exit.resetExitProcedure(); + } } @ClusterTest @@ -351,7 +392,7 @@ public class GetOffsetShellTest { } private List<Row> expectedOffsetsWithInternal() { - List<Row> consOffsets = IntStream.range(0, offsetTopicPartitionCount) + List<Row> consOffsets = IntStream.range(0, 4) .mapToObj(i -> new Row("__consumer_offsets", i, 0L)) .collect(Collectors.toList());