This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch release-1.14 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 79a6688884cb62647456b1f6ea61bf04c301adf2 Author: Qingsheng Ren <[email protected]> AuthorDate: Mon Aug 23 17:18:13 2021 +0800 [hotfix][connector/testing-framework] Let ExternalContext#generateTestData returns List to preserve order (cherry picked from commit 27ec51c56ad23217b162c458d2878fe2af4e963f) --- .../testutils/KafkaSingleTopicExternalContext.java | 3 +- .../cases/MultipleTopicTemplateContext.java | 4 +-- .../cases/SingleTopicConsumingContext.java | 4 +-- .../pulsar/cases/KeySharedSubscriptionContext.java | 3 +- .../pulsar/cases/SharedSubscriptionContext.java | 3 +- .../test/common/external/ExternalContext.java | 6 ++-- .../common/testsuites/SourceTestSuiteBase.java | 33 +++++++++++----------- .../test/common/utils/TestDataMatchers.java | 31 ++++++++++---------- .../test/common/utils/TestDataMatchersTest.java | 3 +- 9 files changed, 42 insertions(+), 48 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java index ad5e31d..d816500 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/testutils/KafkaSingleTopicExternalContext.java @@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory; import org.testcontainers.containers.KafkaContainer; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -155,7 +154,7 @@ public class KafkaSingleTopicExternalContext implements ExternalContext<String> } @Override - public Collection<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData(int splitIndex, long seed) { Random random = new Random(seed); List<String> randomStringRecords = new ArrayList<>(); int recordNum = diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java index a0801ec..0a56701 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/MultipleTopicTemplateContext.java @@ -32,8 +32,8 @@ import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; -import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; @@ -92,7 +92,7 @@ public abstract class MultipleTopicTemplateContext extends PulsarTestContext<Str } @Override - public Collection<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java index b89511c..e649898 100644 --- a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java +++ b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/testutils/cases/SingleTopicConsumingContext.java @@ -29,8 +29,8 @@ import org.apache.flink.connector.pulsar.testutils.PulsarTestContext; import org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment; import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; -import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; @@ -101,7 +101,7 @@ public class SingleTopicConsumingContext extends PulsarTestContext<String> { } @Override - public Collection<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java index e442418..5d937ba 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/KeySharedSubscriptionContext.java @@ -36,7 +36,6 @@ import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.util.Murmur3_32Hash; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import static java.util.Collections.singletonList; @@ -116,7 +115,7 @@ public class KeySharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public Collection<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java index f936b6f..52e30b3 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/cases/SharedSubscriptionContext.java @@ -33,7 +33,6 @@ import org.apache.pulsar.client.api.RegexSubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; import java.util.ArrayList; -import java.util.Collection; import java.util.List; import static org.apache.flink.connector.pulsar.source.reader.deserializer.PulsarDeserializationSchema.pulsarSchema; @@ -90,7 +89,7 @@ public class SharedSubscriptionContext extends PulsarTestContext<String> { } @Override - public Collection<String> generateTestData(int splitIndex, long seed) { + public List<String> generateTestData(int splitIndex, long seed) { return generateStringTestData(splitIndex, seed); } diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java index 5ed06a9..f67dcac 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/external/ExternalContext.java @@ -23,7 +23,7 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.Source; import java.io.Serializable; -import java.util.Collection; +import java.util.List; /** * Context of the test interacting with external system. @@ -64,9 +64,9 @@ public interface ExternalContext<T> extends Serializable, AutoCloseable { * * @param splitIndex index of the split. * @param seed Seed for generating random test data set. - * @return Collection of generated test data. + * @return List of generated test data. */ - Collection<T> generateTestData(int splitIndex, long seed); + List<T> generateTestData(int splitIndex, long seed); /** * Factory for {@link ExternalContext}. diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java index 5aff6e6..4639a3f 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java @@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory; import java.time.Duration; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -102,7 +101,7 @@ public abstract class SourceTestSuiteBase<T> { throws Exception { // Write test data to external system - final Collection<T> testRecords = generateAndWriteTestData(0, externalContext); + final List<T> testRecords = generateAndWriteTestData(0, externalContext); // Build and execute Flink job StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); @@ -137,9 +136,9 @@ public abstract class SourceTestSuiteBase<T> { throws Exception { final int splitNumber = 4; - final List<Collection<T>> testRecordCollections = new ArrayList<>(); + final List<List<T>> testRecordsLists = new ArrayList<>(); for (int i = 0; i < splitNumber; i++) { - testRecordCollections.add(generateAndWriteTestData(i, externalContext)); + testRecordsLists.add(generateAndWriteTestData(i, externalContext)); } LOG.debug("Build and execute Flink job"); @@ -153,7 +152,7 @@ public abstract class SourceTestSuiteBase<T> { .setParallelism(splitNumber) .executeAndCollect("Source Multiple Split Test")) { // Check test result - assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections)); + assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists)); } } @@ -177,9 +176,9 @@ public abstract class SourceTestSuiteBase<T> { throws Exception { final int splitNumber = 4; - final List<Collection<T>> testRecordCollections = new ArrayList<>(); + final List<List<T>> testRecordsLists = new ArrayList<>(); for (int i = 0; i < splitNumber; i++) { - testRecordCollections.add(generateAndWriteTestData(i, externalContext)); + testRecordsLists.add(generateAndWriteTestData(i, externalContext)); } try (CloseableIterator<T> resultIterator = @@ -189,8 +188,8 @@ public abstract class SourceTestSuiteBase<T> { WatermarkStrategy.noWatermarks(), "Tested Source") .setParallelism(splitNumber + 1) - .executeAndCollect("Redundant Parallelism Test")) { - assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections)); + .executeAndCollect("Idle Reader Test")) { + assertThat(resultIterator, matchesMultipleSplitTestData(testRecordsLists)); } } @@ -216,7 +215,7 @@ public abstract class SourceTestSuiteBase<T> { throws Exception { int splitIndex = 0; - final Collection<T> testRecordsBeforeFailure = + final List<T> testRecordsBeforeFailure = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); final SourceSplitDataWriter<T> sourceSplitDataWriter = @@ -266,7 +265,7 @@ public abstract class SourceTestSuiteBase<T> { Collections.singletonList(JobStatus.RUNNING), Deadline.fromNow(Duration.ofSeconds(30))); - final Collection<T> testRecordsAfterFailure = + final List<T> testRecordsAfterFailure = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); sourceSplitDataWriter.writeRecords(testRecordsAfterFailure); @@ -290,15 +289,15 @@ public abstract class SourceTestSuiteBase<T> { * Generate a set of test records and write it to the given split writer. * * @param externalContext External context - * @return Collection of generated test records + * @return List of generated test records */ - protected Collection<T> generateAndWriteTestData( + protected List<T> generateAndWriteTestData( int splitIndex, ExternalContext<T> externalContext) { - final Collection<T> testRecordCollection = + final List<T> testRecords = externalContext.generateTestData( splitIndex, ThreadLocalRandom.current().nextLong()); - LOG.debug("Writing {} records to external system", testRecordCollection.size()); - externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection); - return testRecordCollection; + LOG.debug("Writing {} records to external system", testRecords.size()); + externalContext.createSourceSplitDataWriter().writeRecords(testRecords); + return testRecords; } } diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java index 87d4905..19e3478 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java @@ -24,7 +24,6 @@ import org.hamcrest.Description; import org.hamcrest.TypeSafeDiagnosingMatcher; import java.util.ArrayList; -import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; @@ -35,15 +34,15 @@ public class TestDataMatchers { // ---------------------------- Matcher Builders ---------------------------------- public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData( - Collection<Collection<T>> testDataCollections) { - return new MultipleSplitDataMatcher<>(testDataCollections); + List<List<T>> testRecordsLists) { + return new MultipleSplitDataMatcher<>(testRecordsLists); } - public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testData) { + public static <T> SplitDataMatcher<T> matchesSplitTestData(List<T> testData) { return new SplitDataMatcher<>(testData); } - public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testData, int limit) { + public static <T> SplitDataMatcher<T> matchesSplitTestData(List<T> testData, int limit) { return new SplitDataMatcher<>(testData, limit); } @@ -57,17 +56,17 @@ public class TestDataMatchers { public static class SplitDataMatcher<T> extends TypeSafeDiagnosingMatcher<Iterator<T>> { private static final int UNSET = -1; - private final Collection<T> testData; + private final List<T> testData; private final int limit; private String mismatchDescription = null; - public SplitDataMatcher(Collection<T> testData) { + public SplitDataMatcher(List<T> testData) { this.testData = testData; this.limit = UNSET; } - public SplitDataMatcher(Collection<T> testData, int limit) { + public SplitDataMatcher(List<T> testData, int limit) { if (limit > testData.size()) { throw new IllegalArgumentException( "Limit validation size should be less than number of test records"); @@ -121,12 +120,12 @@ public class TestDataMatchers { /** * Matcher for validating test data from multiple splits. * - * <p>Each collection has a pointer (iterator) pointing to current checking record. When a - * record is received in the stream, it will be compared to all current pointing records in - * collections, and the pointer to the identical record will move forward. + * <p>Each list has a pointer (iterator) pointing to current checking record. When a record is + * received in the stream, it will be compared to all current pointing records in lists, and the + * pointer to the identical record will move forward. * * <p>If the stream preserves the correctness and order of records in all splits, all pointers - * should reach the end of the collection finally. + * should reach the end of the list finally. * * @param <T> Type of validating record */ @@ -136,9 +135,9 @@ public class TestDataMatchers { private String mismatchDescription = null; - public MultipleSplitDataMatcher(Collection<Collection<T>> testDataCollections) { - for (Collection<T> testDataCollection : testDataCollections) { - testDataIterators.add(new IteratorWithCurrent<>(testDataCollection.iterator())); + public MultipleSplitDataMatcher(List<List<T>> testRecordsLists) { + for (List<T> testRecords : testRecordsLists) { + testDataIterators.add(new IteratorWithCurrent<>(testRecords.iterator())); } } @@ -187,7 +186,7 @@ public class TestDataMatchers { } /** - * Whether all pointers have reached the end of collections. + * Whether all pointers have reached the end of lists. * * @return True if all pointers have reached the end. */ diff --git a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java index 1833185..4e4d8a1 100644 --- a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java +++ b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java @@ -102,8 +102,7 @@ public class TestDataMatchersTest { private final List<String> splitA = Arrays.asList("alpha", "beta", "gamma"); private final List<String> splitB = Arrays.asList("one", "two", "three"); private final List<String> splitC = Arrays.asList("1", "2", "3"); - private final List<Collection<String>> testDataCollection = - Arrays.asList(splitA, splitB, splitC); + private final List<List<String>> testDataCollection = Arrays.asList(splitA, splitB, splitC); @Test public void testPositiveCase() {
