This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2c9d7afe4c6 MINOR: add helper method to `ClusterInstance` to wait 
topic deletion (#16627)
2c9d7afe4c6 is described below

commit 2c9d7afe4c60656d10f5b15367e3fb65173d0372
Author: TaiJuWu <[email protected]>
AuthorDate: Sun Aug 4 02:35:00 2024 +0800

    MINOR: add helper method to `ClusterInstance` to wait topic deletion 
(#16627)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 core/src/test/java/kafka/test/ClusterInstance.java       |  4 ++++
 .../test/java/kafka/test/ClusterTestExtensionsTest.java  | 16 ++++++++++++++++
 .../kafka/test/junit/RaftClusterInvocationContext.java   |  1 -
 .../kafka/test/junit/ZkClusterInvocationContext.java     | 16 ++++++++++++++++
 4 files changed, 36 insertions(+), 1 deletion(-)

diff --git a/core/src/test/java/kafka/test/ClusterInstance.java 
b/core/src/test/java/kafka/test/ClusterInstance.java
index 41cc8b485f1..4d08241707a 100644
--- a/core/src/test/java/kafka/test/ClusterInstance.java
+++ b/core/src/test/java/kafka/test/ClusterInstance.java
@@ -183,6 +183,10 @@ public interface ClusterInstance {
 
     //---------------------------[wait]---------------------------//
 
+    default void waitTopicDeletion(String topic) throws InterruptedException {
+        waitForTopic(topic, 0);
+    }
+
     void waitForReadyBrokers() throws InterruptedException;
 
     default void waitForTopic(String topic, int partitions) throws 
InterruptedException {
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java 
b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 498936196ed..54633d2eb3a 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.admin.Config;
 import org.apache.kafka.clients.admin.DescribeLogDirsResult;
+import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.consumer.GroupProtocol;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.server.common.MetadataVersion;
@@ -249,4 +250,19 @@ public class ClusterTestExtensionsTest {
         Assertions.assertTrue(clusterInstance.aliveBrokers().containsKey(0));
         Assertions.assertTrue(clusterInstance.brokers().containsKey(0));
     }
+
+
+    @ClusterTest(types = {Type.ZK, Type.CO_KRAFT, Type.KRAFT}, brokers = 4)
+    public void testVerifyTopicDeletion(ClusterInstance clusterInstance) 
throws Exception {
+        try (Admin admin = clusterInstance.createAdminClient()) {
+            String testTopic = "testTopic";
+            admin.createTopics(Collections.singletonList(new 
NewTopic(testTopic, 1, (short) 1)));
+            clusterInstance.waitForTopic(testTopic, 1);
+            admin.deleteTopics(Collections.singletonList(testTopic));
+            clusterInstance.waitTopicDeletion(testTopic);
+            
Assertions.assertTrue(admin.listTopics().listings().get().stream().noneMatch(
+                    topic -> topic.name().equals(testTopic)
+            ));
+        }
+    }
 }
diff --git 
a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java 
b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index e582cefaecc..f7cc416cc21 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -232,7 +232,6 @@ public class RaftClusterInvocationContext implements 
TestTemplateInvocationConte
             }
         }
 
-
         @Override
         public Map<Integer, KafkaBroker> brokers() {
             return clusterTestKit.brokers().entrySet()
diff --git 
a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java 
b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
index e85cbacc075..67955fd5d18 100644
--- a/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/ZkClusterInvocationContext.java
@@ -206,6 +206,22 @@ public class ZkClusterInvocationContext implements 
TestTemplateInvocationContext
             findBrokerOrThrow(brokerId).startup();
         }
 
+        @Override
+        public void waitTopicDeletion(String topic) throws 
InterruptedException {
+            org.apache.kafka.test.TestUtils.waitForCondition(
+                    () -> 
!clusterReference.get().zkClient().isTopicMarkedForDeletion(topic),
+                    String.format("Admin path /admin/delete_topics/%s path not 
deleted even after a replica is restarted", topic)
+            );
+
+            org.apache.kafka.test.TestUtils.waitForCondition(
+                    () -> 
!clusterReference.get().zkClient().topicExists(topic),
+                    String.format("Topic path /brokers/topics/%s not deleted 
after /admin/delete_topics/%s path is deleted", topic, topic)
+            );
+
+            ClusterInstance.super.waitTopicDeletion(topic);
+        }
+
+
         /**
          * Restart brokers with given cluster config.
          *

Reply via email to