This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8f4d8569773 MINOR: add helper function `createTopic` to
ClusterInstance (#16852)
8f4d8569773 is described below
commit 8f4d856977370997be70d6c5b4dccba054e6daaf
Author: TaiJuWu <[email protected]>
AuthorDate: Sat Aug 31 19:49:59 2024 +0800
MINOR: add helper function `createTopic` to ClusterInstance (#16852)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/test/java/kafka/test/ClusterInstance.java | 8 ++++++++
.../java/kafka/test/ClusterTestExtensionsTest.java | 19 +++++++++++++++++++
2 files changed, 27 insertions(+)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java
b/core/src/test/java/kafka/test/ClusterInstance.java
index 4d08241707a..75522c1ba7f 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -25,6 +25,7 @@ import kafka.test.annotation.ClusterTest;
import kafka.test.annotation.Type;
import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.test.TestUtils;
@@ -186,6 +187,13 @@ public interface ClusterInstance {
default void waitTopicDeletion(String topic) throws InterruptedException {
waitForTopic(topic, 0);
}
+
+ default void createTopic(String topicName, int partitions, short replicas)
throws InterruptedException {
+ try (Admin admin = createAdminClient()) {
+ admin.createTopics(Collections.singletonList(new
NewTopic(topicName, partitions, replicas)));
+ waitForTopic(topicName, partitions);
+ }
+ }
void waitForReadyBrokers() throws InterruptedException;
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 5f8aae823ba..7c06a441a0b 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
+import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.server.common.MetadataVersion;
@@ -232,6 +233,24 @@ public class ClusterTestExtensionsTest {
Assertions.assertEquals(1,
clusterInstance.supportedGroupProtocols().size());
}
+
+
+ @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 3)
+ public void testCreateTopic(ClusterInstance clusterInstance) throws
Exception {
+ String topicName = "test";
+ int numPartition = 3;
+ short numReplicas = 3;
+ clusterInstance.createTopic(topicName, numPartition, numReplicas);
+
+ try (Admin admin = clusterInstance.createAdminClient()) {
+
Assertions.assertTrue(admin.listTopics().listings().get().stream().anyMatch(s
-> s.name().equals(topicName)));
+ List<TopicPartitionInfo> partitions =
admin.describeTopics(Collections.singleton(topicName)).allTopicNames().get()
+ .get(topicName).partitions();
+ Assertions.assertEquals(numPartition, partitions.size());
+ Assertions.assertTrue(partitions.stream().allMatch(partition ->
partition.replicas().size() == numReplicas));
+ }
+ }
+
@ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
public void testClusterAliveBrokers(ClusterInstance clusterInstance)
throws Exception {
clusterInstance.waitForReadyBrokers();