This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new a9c50b2  KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures 
(#5418)
a9c50b2 is described below

commit a9c50b29f1727becdda738a62b9241ca72fdf0d3
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Tue Jul 24 14:59:01 2018 +0530

    KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures (#5418)
---
 .../integration/AbstractJoinIntegrationTest.java   |  2 +-
 .../integration/utils/EmbeddedKafkaCluster.java    | 22 ++++++++++++++++++++++
 2 files changed, 23 insertions(+), 1 deletion(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 80ab606..3e29fc2 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -163,7 +163,7 @@ public abstract class AbstractJoinIntegrationTest {
 
     @After
     public void cleanup() throws InterruptedException {
-        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, 
INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+        CLUSTER.deleteAllTopicsAndWait(120000);
     }
 
     private void checkResult(final String outputTopic, final List<String> 
expectedResult) throws InterruptedException {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index ce6324d..ab52649 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -274,6 +274,24 @@ public class EmbeddedKafkaCluster extends ExternalResource 
{
         }
     }
 
+    /**
+     * Deletes all topics and blocks until all topics got deleted.
+     *
+     * @param timeoutMs the max time to wait for the topics to be deleted 
(does not block if {@code <= 0})
+     */
+    public void deleteAllTopicsAndWait(final long timeoutMs) throws 
InterruptedException {
+        final Set<String> topics = new 
HashSet<>(JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
+        for (final String topic : topics) {
+            try {
+                brokers[0].deleteTopic(topic);
+            } catch (final UnknownTopicOrPartitionException e) { }
+        }
+
+        if (timeoutMs > 0) {
+            TestUtils.waitForCondition(new TopicsDeletedCondition(topics), 
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+        }
+    }
+
     public void deleteAndRecreateTopics(final String... topics) throws 
InterruptedException {
         deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
         createTopics(topics);
@@ -295,6 +313,10 @@ public class EmbeddedKafkaCluster extends ExternalResource 
{
             Collections.addAll(deletedTopics, topics);
         }
 
+        public TopicsDeletedCondition(final Set<String> topics) {
+            deletedTopics.addAll(topics);
+        }
+
         @Override
         public boolean conditionMet() {
             final Set<String> allTopics = new HashSet<>(

Reply via email to