This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a03c8f13f735bf5b8d120fb7aa95587574b6dfd6
Author: Aljoscha Krettek <[email protected]>
AuthorDate: Thu Jun 4 15:20:19 2020 +0200

    [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent 
clashes
    
    Duplicate topic names and leftover data could be a potential source of 
instabilities.
---
 .../org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java    | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index bf7358d..fabefc1 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -58,6 +58,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.junit.Assert.assertThat;
@@ -92,6 +93,7 @@ public class SQLClientKafkaITCase extends TestLogger {
        @Rule
        public final TemporaryFolder tmp = new TemporaryFolder();
 
+       private final String kafkaVersion;
        private final String kafkaSQLVersion;
        private Path result;
        private Path sqlClientSessionConf;
@@ -107,6 +109,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 
        public SQLClientKafkaITCase(String kafkaVersion, String 
kafkaSQLVersion, String kafkaSQLJarPattern) {
                this.kafka = KafkaResource.get(kafkaVersion);
+               this.kafkaVersion = kafkaVersion;
                this.kafkaSQLVersion = kafkaSQLVersion;
 
                this.sqlConnectorKafkaJar = 
TestUtils.getResourceJar(kafkaSQLJarPattern);
@@ -129,8 +132,8 @@ public class SQLClientKafkaITCase extends TestLogger {
        public void testKafka() throws Exception {
                try (ClusterController clusterController = 
flink.startCluster(2)) {
                        // Create topic and send message
-                       String testJsonTopic = "test-json";
-                       String testAvroTopic = "test-avro";
+                       String testJsonTopic = "test-json-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
+                       String testAvroTopic = "test-avro-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
                        kafka.createTopic(1, 1, testJsonTopic);
                        String[] messages = new String[]{
                                        "{\"timestamp\": 
\"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": 
\"WARNING\", \"message\": \"This is a warning.\"}}",

Reply via email to