This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 437ebb941e1 KAFKA-15729 Add KRaft support in GetOffsetShellTest 
(#15489)
437ebb941e1 is described below

commit 437ebb941e1485a6f1ad7afbba3814bfa310c9cf
Author: Owen Leung <owen.leu...@gmail.com>
AuthorDate: Sun Apr 14 21:13:49 2024 +0800

    KAFKA-15729 Add KRaft support in GetOffsetShellTest (#15489)
    
    Reviewers: Chia-Ping Tsai <chia7...@gmail.com>
---
 .../org/apache/kafka/tools/GetOffsetShellTest.java | 105 ++++++++++++++-------
 1 file changed, 73 insertions(+), 32 deletions(-)

diff --git a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java 
b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
index dad41342b7b..231222ec6a3 100644
--- a/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/GetOffsetShellTest.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.tools;
 
 import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterConfigProperty;
 import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.ClusterTestDefaults;
 import kafka.test.annotation.Type;
@@ -25,16 +26,19 @@ import kafka.test.junit.ClusterTestExtensions;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.common.utils.Exit;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
 import org.junit.jupiter.api.extension.ExtendWith;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -46,33 +50,29 @@ import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 
 @ExtendWith(value = ClusterTestExtensions.class)
-@ClusterTestDefaults(clusterType = Type.ZK)
+@ClusterTestDefaults(clusterType = Type.ALL, serverProperties = {
+    @ClusterConfigProperty(key = "auto.create.topics.enable", value = "false"),
+    @ClusterConfigProperty(key = "offsets.topic.replication.factor", value = 
"1"),
+    @ClusterConfigProperty(key = "offsets.topic.num.partitions", value = "4")
+})
 @Tag("integration")
 public class GetOffsetShellTest {
     private final int topicCount = 4;
-    private final int offsetTopicPartitionCount = 4;
     private final ClusterInstance cluster;
-    private final String topicName = "topic";
 
     public GetOffsetShellTest(ClusterInstance cluster) {
         this.cluster = cluster;
     }
 
     private String getTopicName(int i) {
-        return topicName + i;
-    }
-
-    @BeforeEach
-    public void before() {
-        cluster.config().serverProperties().put("auto.create.topics.enable", 
false);
-        
cluster.config().serverProperties().put("offsets.topic.replication.factor", 
"1");
-        
cluster.config().serverProperties().put("offsets.topic.num.partitions", 
String.valueOf(offsetTopicPartitionCount));
+        return "topic" + i;
     }
 
     private void setUp() {
-        try (Admin admin = 
Admin.create(cluster.config().adminClientProperties())) {
+        try (Admin admin = cluster.createAdminClient()) {
             List<NewTopic> topics = new ArrayList<>();
 
             IntStream.range(0, topicCount + 1).forEach(i -> topics.add(new 
NewTopic(getTopicName(i), i, (short) 1)));
@@ -81,28 +81,53 @@ public class GetOffsetShellTest {
         }
 
         Properties props = new Properties();
-        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.config().producerProperties().get("bootstrap.servers"));
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
 
         try (KafkaProducer<String, String> producer = new 
KafkaProducer<>(props)) {
             IntStream.range(0, topicCount + 1)
                 .forEach(i -> IntStream.range(0, i * i)
-                        .forEach(msgCount -> producer.send(
-                                new ProducerRecord<>(getTopicName(i), msgCount 
% i, null, "val" + msgCount)))
+                        .forEach(msgCount -> {
+                            assertDoesNotThrow(() -> producer.send(
+                                    new ProducerRecord<>(getTopicName(i), 
msgCount % i, null, "val" + msgCount)).get());
+                        })
                 );
         }
     }
 
+    private void createConsumerAndPoll() {
+        Properties props = new Properties();
+        props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class);
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
+        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+
+        try (KafkaConsumer<String, String> consumer = new 
KafkaConsumer<>(props)) {
+            List<String> topics = new ArrayList<>();
+            for (int i = 0; i < topicCount + 1; i++) {
+                topics.add(getTopicName(i));
+            }
+            consumer.subscribe(topics);
+            consumer.poll(Duration.ofMillis(1000));
+        }
+    }
+
     static class Row {
-        private String name;
-        private int partition;
-        private Long timestamp;
+        private final String name;
+        private final int partition;
+        private final Long offset;
 
-        public Row(String name, int partition, Long timestamp) {
+        public Row(String name, int partition, Long offset) {
             this.name = name;
             this.partition = partition;
-            this.timestamp = timestamp;
+            this.offset = offset;
+        }
+
+        @Override
+        public String toString() {
+            return "Row[name:" + name + ",partition:" + partition + ",offset:" 
+ offset;
         }
 
         @Override
@@ -113,12 +138,12 @@ public class GetOffsetShellTest {
 
             Row r = (Row) o;
 
-            return name.equals(r.name) && partition == r.partition && 
Objects.equals(timestamp, r.timestamp);
+            return name.equals(r.name) && partition == r.partition && 
Objects.equals(offset, r.offset);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(name, partition, timestamp);
+            return Objects.hash(name, partition, offset);
         }
     }
 
@@ -127,8 +152,11 @@ public class GetOffsetShellTest {
         setUp();
 
         List<Row> output = executeAndParse();
-
-        assertEquals(expectedOffsetsWithInternal(), output);
+        if (!cluster.isKRaftTest()) {
+            assertEquals(expectedOffsetsWithInternal(), output);
+        } else {
+            assertEquals(expectedTestTopicOffsets(), output);
+        }
     }
 
     @ClusterTest
@@ -148,7 +176,8 @@ public class GetOffsetShellTest {
             List<Row> offsets = executeAndParse("--topic", getTopicName(i));
 
             assertEquals(expectedOffsetsForTopic(i), offsets, () -> "Offset 
output did not match for " + getTopicName(i));
-        });
+            }
+        );
     }
 
     @ClusterTest
@@ -165,8 +194,11 @@ public class GetOffsetShellTest {
         setUp();
 
         List<Row> offsets = executeAndParse("--partitions", "0,1");
-
-        assertEquals(expectedOffsetsWithInternal().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);
+        if (!cluster.isKRaftTest()) {
+            assertEquals(expectedOffsetsWithInternal().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);
+        } else {
+            assertEquals(expectedTestTopicOffsets().stream().filter(r -> 
r.partition <= 1).collect(Collectors.toList()), offsets);
+        }
     }
 
     @ClusterTest
@@ -182,6 +214,8 @@ public class GetOffsetShellTest {
     public void testTopicPartitionsArg() {
         setUp();
 
+        createConsumerAndPoll();
+
         List<Row> offsets = executeAndParse("--topic-partitions", 
"topic1:0,topic2:1,topic(3|4):2,__.*:3");
         List<Row> expected = Arrays.asList(
                 new Row("__consumer_offsets", 3, 0L),
@@ -236,7 +270,7 @@ public class GetOffsetShellTest {
             List<Row> offsets = executeAndParse("--topic-partitions", 
"topic.*", "--time", time);
 
             offsets.forEach(
-                    row -> assertTrue(row.timestamp >= 0 && row.timestamp <= 
Integer.parseInt(row.name.replace("topic", "")))
+                    row -> assertTrue(row.offset >= 0 && row.offset <= 
Integer.parseInt(row.name.replace("topic", "")))
             );
         }
     }
@@ -288,6 +322,8 @@ public class GetOffsetShellTest {
     public void testTopicPartitionsArgWithInternalIncluded() {
         setUp();
 
+        createConsumerAndPoll();
+
         List<Row> offsets = executeAndParse("--topic-partitions", "__.*:0");
 
         assertEquals(Arrays.asList(new Row("__consumer_offsets", 0, 0L)), 
offsets);
@@ -320,8 +356,13 @@ public class GetOffsetShellTest {
 
     @ClusterTest
     public void testPrintHelp() {
-        String out = ToolsTestUtils.captureStandardErr(() -> 
GetOffsetShell.mainNoExit("--help"));
-        assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT));
+        Exit.setExitProcedure((statusCode, message) -> { });
+        try {
+            String out = ToolsTestUtils.captureStandardErr(() -> 
GetOffsetShell.mainNoExit("--help"));
+            assertTrue(out.startsWith(GetOffsetShell.USAGE_TEXT));
+        } finally {
+            Exit.resetExitProcedure();
+        }
     }
 
     @ClusterTest
@@ -351,7 +392,7 @@ public class GetOffsetShellTest {
     }
 
     private List<Row> expectedOffsetsWithInternal() {
-        List<Row> consOffsets = IntStream.range(0, offsetTopicPartitionCount)
+        List<Row> consOffsets = IntStream.range(0, 4)
                 .mapToObj(i -> new Row("__consumer_offsets", i, 0L))
                 .collect(Collectors.toList());
 

Reply via email to