This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.0 by this push:
new a9c50b2 KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures
(#5418)
a9c50b2 is described below
commit a9c50b29f1727becdda738a62b9241ca72fdf0d3
Author: Manikumar Reddy O <[email protected]>
AuthorDate: Tue Jul 24 14:59:01 2018 +0530
KAFKA-7195: Fix StreamStreamJoinIntegrationTest test failures (#5418)
---
.../integration/AbstractJoinIntegrationTest.java | 2 +-
.../integration/utils/EmbeddedKafkaCluster.java | 22 ++++++++++++++++++++++
2 files changed, 23 insertions(+), 1 deletion(-)
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
index 80ab606..3e29fc2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -163,7 +163,7 @@ public abstract class AbstractJoinIntegrationTest {
@After
public void cleanup() throws InterruptedException {
- CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT,
INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+ CLUSTER.deleteAllTopicsAndWait(120000);
}
private void checkResult(final String outputTopic, final List<String>
expectedResult) throws InterruptedException {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index ce6324d..ab52649 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -274,6 +274,24 @@ public class EmbeddedKafkaCluster extends ExternalResource
{
}
}
+ /**
+ * Deletes all topics and blocks until all topics got deleted.
+ *
+ * @param timeoutMs the max time to wait for the topics to be deleted
(does not block if {@code <= 0})
+ */
+ public void deleteAllTopicsAndWait(final long timeoutMs) throws
InterruptedException {
+ final Set<String> topics = new
HashSet<>(JavaConverters.seqAsJavaListConverter(zkUtils.getAllTopics()).asJava());
+ for (final String topic : topics) {
+ try {
+ brokers[0].deleteTopic(topic);
+ } catch (final UnknownTopicOrPartitionException e) { }
+ }
+
+ if (timeoutMs > 0) {
+ TestUtils.waitForCondition(new TopicsDeletedCondition(topics),
timeoutMs, "Topics not deleted after " + timeoutMs + " milli seconds.");
+ }
+ }
+
public void deleteAndRecreateTopics(final String... topics) throws
InterruptedException {
deleteTopicsAndWait(TOPIC_DELETION_TIMEOUT, topics);
createTopics(topics);
@@ -295,6 +313,10 @@ public class EmbeddedKafkaCluster extends ExternalResource
{
Collections.addAll(deletedTopics, topics);
}
+ public TopicsDeletedCondition(final Set<String> topics) {
+ deletedTopics.addAll(topics);
+ }
+
@Override
public boolean conditionMet() {
final Set<String> allTopics = new HashSet<>(