This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 79a6688884cb62647456b1f6ea61bf04c301adf2
Author: Qingsheng Ren <[email protected]>
AuthorDate: Mon Aug 23 17:18:13 2021 +0800

    [hotfix][connector/testing-framework] Let ExternalContext#generateTestData 
returns List to preserve order
    
    (cherry picked from commit 27ec51c56ad23217b162c458d2878fe2af4e963f)
---
 .../testutils/KafkaSingleTopicExternalContext.java |  3 +-
 .../cases/MultipleTopicTemplateContext.java        |  4 +--
 .../cases/SingleTopicConsumingContext.java         |  4 +--
 .../pulsar/cases/KeySharedSubscriptionContext.java |  3 +-
 .../pulsar/cases/SharedSubscriptionContext.java    |  3 +-
 .../test/common/external/ExternalContext.java      |  6 ++--
 .../common/testsuites/SourceTestSuiteBase.java     | 33 +++++++++++-----------
 .../test/common/utils/TestDataMatchers.java        | 31 ++++++++++----------
 .../test/common/utils/TestDataMatchersTest.java    |  3 +-
 9 files changed, 42 insertions(+), 48 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
index ad5e31d..d816500 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.KafkaContainer;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -155,7 +154,7 @@ public class KafkaSingleTopicExternalContext implements 
ExternalContext<String>
     }
 
     @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(int splitIndex, long seed) {
         Random random = new Random(seed);
         List<String> randomStringRecords = new ArrayList<>();
         int recordNum =
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
index a0801ec..0a56701 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java
@@ -32,8 +32,8 @@ import 
org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
@@ -92,7 +92,7 @@ public abstract class MultipleTopicTemplateContext extends 
PulsarTestContext<Str
     }
 
     @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(int splitIndex, long seed) {
         return generateStringTestData(splitIndex, seed);
     }
 
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
index b89511c..e649898 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java
@@ -29,8 +29,8 @@ import 
org.apache.flink.connector.pulsar.testutils.PulsarTestContext;
 import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
 
-import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 
@@ -101,7 +101,7 @@ public class SingleTopicConsumingContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(int splitIndex, long seed) {
         return generateStringTestData(splitIndex, seed);
     }
 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
index e442418..5d937ba 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java
@@ -36,7 +36,6 @@ import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.util.Murmur3_32Hash;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import static java.util.Collections.singletonList;
@@ -116,7 +115,7 @@ public class KeySharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(int splitIndex, long seed) {
         return generateStringTestData(splitIndex, seed);
     }
 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
index f936b6f..52e30b3 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java
@@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.SubscriptionType;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 
 import static 
org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema;
@@ -90,7 +89,7 @@ public class SharedSubscriptionContext extends 
PulsarTestContext<String> {
     }
 
     @Override
-    public Collection<String> generateTestData(int splitIndex, long seed) {
+    public List<String> generateTestData(int splitIndex, long seed) {
         return generateStringTestData(splitIndex, seed);
     }
 
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
index 5ed06a9..f67dcac 100644
--- 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java
@@ -23,7 +23,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.Source;
 
 import java.io.Serializable;
-import java.util.Collection;
+import java.util.List;
 
 /**
  * Context of the test interacting with external system.
@@ -64,9 +64,9 @@ public interface ExternalContext<T> extends Serializable, 
AutoCloseable {
      *
      * @param splitIndex index of the split.
      * @param seed Seed for generating random test data set.
-     * @return Collection of generated test data.
+     * @return List of generated test data.
      */
-    Collection<T> generateTestData(int splitIndex, long seed);
+    List<T> generateTestData(int splitIndex, long seed);
 
     /**
      * Factory for {@link ExternalContext}.
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
index 5aff6e6..4639a3f 100644
--- 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
@@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
@@ -102,7 +101,7 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
 
         // Write test data to external system
-        final Collection<T> testRecords = generateAndWriteTestData(0, 
externalContext);
+        final List<T> testRecords = generateAndWriteTestData(0, 
externalContext);
 
         // Build and execute Flink job
         StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
@@ -137,9 +136,9 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
 
         final int splitNumber = 4;
-        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        final List<List<T>> testRecordsLists = new ArrayList<>();
         for (int i = 0; i < splitNumber; i++) {
-            testRecordCollections.add(generateAndWriteTestData(i, 
externalContext));
+            testRecordsLists.add(generateAndWriteTestData(i, externalContext));
         }
 
         LOG.debug("Build and execute Flink job");
@@ -153,7 +152,7 @@ public abstract class SourceTestSuiteBase<T> {
                         .setParallelism(splitNumber)
                         .executeAndCollect("Source Multiple Split Test")) {
             // Check test result
-            assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordCollections));
+            assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordsLists));
         }
     }
 
@@ -177,9 +176,9 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
 
         final int splitNumber = 4;
-        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        final List<List<T>> testRecordsLists = new ArrayList<>();
         for (int i = 0; i < splitNumber; i++) {
-            testRecordCollections.add(generateAndWriteTestData(i, 
externalContext));
+            testRecordsLists.add(generateAndWriteTestData(i, externalContext));
         }
 
         try (CloseableIterator<T> resultIterator =
@@ -189,8 +188,8 @@ public abstract class SourceTestSuiteBase<T> {
                                 WatermarkStrategy.noWatermarks(),
                                 "Tested Source")
                         .setParallelism(splitNumber + 1)
-                        .executeAndCollect("Redundant Parallelism Test")) {
-            assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordCollections));
+                        .executeAndCollect("Idle Reader Test")) {
+            assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordsLists));
         }
     }
 
@@ -216,7 +215,7 @@ public abstract class SourceTestSuiteBase<T> {
             throws Exception {
         int splitIndex = 0;
 
-        final Collection<T> testRecordsBeforeFailure =
+        final List<T> testRecordsBeforeFailure =
                 externalContext.generateTestData(
                         splitIndex, ThreadLocalRandom.current().nextLong());
         final SourceSplitDataWriter<T> sourceSplitDataWriter =
@@ -266,7 +265,7 @@ public abstract class SourceTestSuiteBase<T> {
                 Collections.singletonList(JobStatus.RUNNING),
                 Deadline.fromNow(Duration.ofSeconds(30)));
 
-        final Collection<T> testRecordsAfterFailure =
+        final List<T> testRecordsAfterFailure =
                 externalContext.generateTestData(
                         splitIndex, ThreadLocalRandom.current().nextLong());
         sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
@@ -290,15 +289,15 @@ public abstract class SourceTestSuiteBase<T> {
      * Generate a set of test records and write it to the given split writer.
      *
      * @param externalContext External context
-     * @return Collection of generated test records
+     * @return List of generated test records
      */
-    protected Collection<T> generateAndWriteTestData(
+    protected List<T> generateAndWriteTestData(
             int splitIndex, ExternalContext<T> externalContext) {
-        final Collection<T> testRecordCollection =
+        final List<T> testRecords =
                 externalContext.generateTestData(
                         splitIndex, ThreadLocalRandom.current().nextLong());
-        LOG.debug("Writing {} records to external system", 
testRecordCollection.size());
-        
externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection);
-        return testRecordCollection;
+        LOG.debug("Writing {} records to external system", testRecords.size());
+        
externalContext.createSourceSplitDataWriter().writeRecords(testRecords);
+        return testRecords;
     }
 }
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
index 87d4905..19e3478 100644
--- 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
@@ -24,7 +24,6 @@ import org.hamcrest.Description;
 import org.hamcrest.TypeSafeDiagnosingMatcher;
 
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -35,15 +34,15 @@ public class TestDataMatchers {
 
     // ----------------------------  Matcher Builders 
----------------------------------
     public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
-            Collection<Collection<T>> testDataCollections) {
-        return new MultipleSplitDataMatcher<>(testDataCollections);
+            List<List<T>> testRecordsLists) {
+        return new MultipleSplitDataMatcher<>(testRecordsLists);
     }
 
-    public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> 
testData) {
+    public static <T> SplitDataMatcher<T> matchesSplitTestData(List<T> 
testData) {
         return new SplitDataMatcher<>(testData);
     }
 
-    public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> 
testData, int limit) {
+    public static <T> SplitDataMatcher<T> matchesSplitTestData(List<T> 
testData, int limit) {
         return new SplitDataMatcher<>(testData, limit);
     }
 
@@ -57,17 +56,17 @@ public class TestDataMatchers {
     public static class SplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
         private static final int UNSET = -1;
 
-        private final Collection<T> testData;
+        private final List<T> testData;
         private final int limit;
 
         private String mismatchDescription = null;
 
-        public SplitDataMatcher(Collection<T> testData) {
+        public SplitDataMatcher(List<T> testData) {
             this.testData = testData;
             this.limit = UNSET;
         }
 
-        public SplitDataMatcher(Collection<T> testData, int limit) {
+        public SplitDataMatcher(List<T> testData, int limit) {
             if (limit > testData.size()) {
                 throw new IllegalArgumentException(
                         "Limit validation size should be less than number of 
test records");
@@ -121,12 +120,12 @@ public class TestDataMatchers {
     /**
      * Matcher for validating test data from multiple splits.
      *
-     * <p>Each collection has a pointer (iterator) pointing to current 
checking record. When a
-     * record is received in the stream, it will be compared to all current 
pointing records in
-     * collections, and the pointer to the identical record will move forward.
+     * <p>Each list has a pointer (iterator) pointing to current checking 
record. When a record is
+     * received in the stream, it will be compared to all current pointing 
records in lists, and the
+     * pointer to the identical record will move forward.
      *
      * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
-     * should reach the end of the collection finally.
+     * should reach the end of the list finally.
      *
      * @param <T> Type of validating record
      */
@@ -136,9 +135,9 @@ public class TestDataMatchers {
 
         private String mismatchDescription = null;
 
-        public MultipleSplitDataMatcher(Collection<Collection<T>> 
testDataCollections) {
-            for (Collection<T> testDataCollection : testDataCollections) {
-                testDataIterators.add(new 
IteratorWithCurrent<>(testDataCollection.iterator()));
+        public MultipleSplitDataMatcher(List<List<T>> testRecordsLists) {
+            for (List<T> testRecords : testRecordsLists) {
+                testDataIterators.add(new 
IteratorWithCurrent<>(testRecords.iterator()));
             }
         }
 
@@ -187,7 +186,7 @@ public class TestDataMatchers {
         }
 
         /**
-         * Whether all pointers have reached the end of collections.
+         * Whether all pointers have reached the end of lists.
          *
          * @return True if all pointers have reached the end.
          */
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
 
b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
index 1833185..4e4d8a1 100644
--- 
a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
@@ -102,8 +102,7 @@ public class TestDataMatchersTest {
         private final List<String> splitA = Arrays.asList("alpha", "beta", 
"gamma");
         private final List<String> splitB = Arrays.asList("one", "two", 
"three");
         private final List<String> splitC = Arrays.asList("1", "2", "3");
-        private final List<Collection<String>> testDataCollection =
-                Arrays.asList(splitA, splitB, splitC);
+        private final List<List<String>> testDataCollection = 
Arrays.asList(splitA, splitB, splitC);
 
         @Test
         public void testPositiveCase() {

Reply via email to