This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 2c9d7afe4c6 MINOR: add helper method to `ClusterInstance` to wait
topic deletion (#16627)
2c9d7afe4c6 is described below
commit 2c9d7afe4c60656d10f5b15367e3fb65173d0372
Author: TaiJuWu <[email protected]>
AuthorDate: Sun Aug 4 02:35:00 2024 +0800
MINOR: add helper method to `ClusterInstance` to wait topic deletion
(#16627)
Reviewers: Chia-Ping Tsai <[email protected]>
---
core/src/test/java/kafka/test/ClusterInstance.java | 4 ++++
.../test/java/kafka/test/ClusterTestExtensionsTest.java | 16 ++++++++++++++++
.../kafka/test/junit/RaftClusterInvocationContext.java | 1 -
.../kafka/test/junit/ZkClusterInvocationContext.java | 16 ++++++++++++++++
4 files changed, 36 insertions(+), 1 deletion(-)
diff --git a/core/src/test/java/kafka/test/ClusterInstance.java
b/core/src/test/java/kafka/test/ClusterInstance.java
index 41cc8b485f1..4d08241707a 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -183,6 +183,10 @@ public interface ClusterInstance {
//---------------------------[wait]---------------------------//
+ default void waitTopicDeletion(String topic) throws InterruptedException {
+ waitForTopic(topic, 0);
+ }
+
void waitForReadyBrokers() throws InterruptedException;
default void waitForTopic(String topic, int partitions) throws
InterruptedException {
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 498936196ed..54633d2eb3a 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.GroupProtocol;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.server.common.MetadataVersion;
@@ -249,4 +250,19 @@ public class ClusterTestExtensionsTest {
Assertions.assertTrue(clusterInstance.aliveBrokers().containsKey(0));
Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
}
+
+
+ @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+ public void testVerifyTopicDeletion(ClusterInstance clusterInstance)
throws Exception {
+ try (Admin admin = clusterInstance.createAdminClient()) {
+ String testTopic = "testTopic";
+ admin.createTopics(Collections.singletonList(new
NewTopic(testTopic, 1, (short) 1)));
+ clusterInstance.waitForTopic(testTopic, 1);
+ admin.deleteTopics(Collections.singletonList(testTopic));
+ clusterInstance.waitTopicDeletion(testTopic);
+
Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch(
+ topic -> topic.name().equals(testTopic)
+ ));
+ }
+ }
}
diff --git
a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index e582cefaecc..f7cc416cc21 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -232,7 +232,6 @@ public class RaftClusterInvocationContext implements
TestTemplateInvocationConte
}
}
-
@Override
public Map<Integer, KafkaBroker> brokers() {
return clusterTestKit.brokers().entrySet()
diff --git
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index e85cbacc075..67955fd5d18 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -206,6 +206,22 @@ public class ZkClusterInvocationContext implements
TestTemplateInvocationContext
findBrokerOrThrow(brokerId).startup();
}
+ @Override
+ public void waitTopicDeletion(String topic) throws
InterruptedException {
+ org.apache.kafka.test.TestUtils.waitForCondition(
+ () ->
!clusterReference.get().zkClient().isTopicMarkedForDeletion(topic),
+ String.format("Admin path /admin/delete_topics/%s path not
deleted even after a replica is restarted", topic)
+ );
+
+ org.apache.kafka.test.TestUtils.waitForCondition(
+ () ->
!clusterReference.get().zkClient().topicExists(topic),
+ String.format("Topic path /brokers/topics/%s not deleted
after /admin/delete_topics/%s path is deleted", topic, topic)
+ );
+
+ ClusterInstance.super.waitTopicDeletion(topic);
+ }
+
+
/**
* Restart brokers with given cluster config.
*