This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4034ac296d4e8b43af570d3436cd2dec33d9ef1
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Thu Jun 4 10:34:16 2020 +0200

    [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent 
clashes
    
    Duplicate topic names and leftover data could be a potential source of
    instabilities.
---
 .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java   | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index 4f62839..5e64159 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -43,6 +43,7 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -65,6 +66,8 @@ public class StreamingKafkaITCase extends TestLogger {
 
        private final Path kafkaExampleJar;
 
+       private final String kafkaVersion;
+
        @Rule
        public final KafkaResource kafka;
 
@@ -81,14 +84,15 @@ public class StreamingKafkaITCase extends TestLogger {
        public StreamingKafkaITCase(final String kafkaExampleJarPattern, final 
String kafkaVersion) {
                this.kafkaExampleJar = 
TestUtils.getResourceJar(kafkaExampleJarPattern);
                this.kafka = KafkaResource.get(kafkaVersion);
+               this.kafkaVersion = kafkaVersion;
        }
 
        @Test
        public void testKafka() throws Exception {
                try (final ClusterController clusterController = 
flink.startCluster(1)) {
 
-                       final String inputTopic = "test-input";
-                       final String outputTopic = "test-output";
+                       final String inputTopic = "test-input-" + kafkaVersion 
+ "-" + UUID.randomUUID().toString();
+                       final String outputTopic = "test-output" + kafkaVersion 
+ "-" + UUID.randomUUID().toString();
 
                        // create the required topics
                        kafka.createTopic(1, 1, inputTopic);

Reply via email to