This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8f4d8569773 MINOR: add helper function `createTopic` to 
ClusterInstance (#16852)
8f4d8569773 is described below

commit 8f4d856977370997be70d6c5b4dccba054e6daaf
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Aug 31 19:49:59 2024 +0800

    MINOR: add helper function `createTopic` to ClusterInstance (#16852)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/test/java/kafka/test/ClusterInstance.java    |  8 ++++++++
 .../java/kafka/test/ClusterTestExtensionsTest.java    | 19 +++++++++++++++++++
 2 files changed, 27 insertions(+)

diff --git a/core/src/test/java/kafka/test/ClusterInstance.java 
b/core/src/test/java/kafka/test/ClusterInstance.java
index 4d08241707a..75522c1ba7f 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTest;
 import kafka.test.annotation.Type;
 
 import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.test.TestUtils;
@@ -186,6 +187,13 @@ public interface ClusterInstance {
     default void waitTopicDeletion(String topic) throws InterruptedException {
         waitForTopic(topic, 0);
     }
+    
+    default void createTopic(String topicName, int partitions, short replicas) 
throws InterruptedException {
+        try (Admin admin = createAdminClient()) {
+            admin.createTopics(Collections.singletonList(new 
NewTopic(topicName, partitions, replicas)));
+            waitForTopic(topicName, partitions);
+        }
+    }
 
     void waitForReadyBrokers() throws InterruptedException;
 
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 5f8aae823ba..7c06a441a0b 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeLogDirsResult;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.TopicPartitionInfo;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.server.common.MetadataVersion;
 
@@ -232,6 +233,24 @@ public class ClusterTestExtensionsTest {
         Assertions.assertEquals(1, 
clusterInstance.supportedGroupProtocols().size());
     }
 
+
+
+    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
+    public void testCreateTopic(ClusterInstance clusterInstance) throws 
Exception {
+        String topicName = "test";
+        int numPartition = 3;
+        short numReplicas = 3;
+        clusterInstance.createTopic(topicName, numPartition, numReplicas);
+
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s 
-> s.name().equals(topicName)));
+            List<TopicPartitionInfo> partitions = 
admin.describeTopics(Collections.singleton(topicName)).allTopicNames().get()
+                    .get(topicName).partitions();
+            Assertions.assertEquals(numPartition, partitions.size());
+            Assertions.assertTrue(partitions.stream().allMatch(partition -> 
partition.replicas().size() == numReplicas));
+        }
+    }
+
     @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
     public void testClusterAliveBrokers(ClusterInstance clusterInstance) 
throws Exception {
         clusterInstance.waitForReadyBrokers();

Reply via email to