This is an automated email from the ASF dual-hosted git repository.
leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f1e17c9 [FLINK-25840][tests] Add semantics test support for connector
test framework
f1e17c9 is described below
commit f1e17c9f39b876537ae2065c6331baa5302bbddf
Author: Hang Ruan <[email protected]>
AuthorDate: Thu Jan 27 16:57:27 2022 +0800
[FLINK-25840][tests] Add semantics test support for connector test framework
This closes #18547
---
.../connector/kafka/source/KafkaSourceITCase.java | 4 +
.../pulsar/source/PulsarSourceITCase.java | 5 +
.../flink/tests/util/kafka/KafkaSourceE2ECase.java | 5 +
.../util/pulsar/PulsarSourceOrderedE2ECase.java | 6 +
.../util/pulsar/PulsarSourceUnorderedE2ECase.java | 6 +
.../flink/formats/avro/AvroBulkFormatITCase.java | 6 +
.../testframe/junit/annotations/TestSemantics.java | 37 +++
.../extensions/ConnectorTestingExtension.java | 13 +
.../TestCaseInvocationContextProvider.java | 62 ++++-
.../testframe/testsuites/SourceTestSuiteBase.java | 100 ++++---
.../testframe/utils/CollectIteratorAssert.java | 261 ++++++++++++++++++
.../testframe/utils/CollectIteratorAssertions.java | 31 +++
.../testframe/utils/TestDataMatchers.java | 300 ---------------------
.../testframe/utils/CollectIteratorAssertTest.java | 151 +++++++++++
14 files changed, 649 insertions(+), 338 deletions(-)
diff --git
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index c215077..71108d2 100644
---
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -33,7 +33,9 @@ import
org.apache.flink.connector.testframe.external.DefaultContainerizedExterna
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -259,6 +261,8 @@ public class KafkaSourceITCase {
/** Integration test based on connector testing framework. */
@Nested
class IntegrationTests extends SourceTestSuiteBase<String> {
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
// Defines test environment on Flink MiniCluster
@SuppressWarnings("unused")
diff --git
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 5e5dca2..b28e449 100644
---
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -27,7 +27,9 @@ import
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironme
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
/** Unite test class for {@link PulsarSource}. */
@SuppressWarnings("unused")
@@ -40,6 +42,9 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
@TestExternalSystem
PulsarTestEnvironment pulsar = new
PulsarTestEnvironment(PulsarRuntime.mock());
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
+
// Defines an external context Factories,
// so test cases will be invoked using this external contexts.
@TestContext
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
index 2b6e8d5..eae1eda 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -23,7 +23,9 @@ import
org.apache.flink.connector.testframe.external.DefaultContainerizedExterna
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.TestUtils;
import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
import org.apache.flink.util.DockerImageVersions;
@@ -40,6 +42,9 @@ import static
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalCont
public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
private static final String KAFKA_HOSTNAME = "kafka";
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
+
// Defines TestEnvironment
@TestEnv FlinkContainerTestEnvironment flink = new
FlinkContainerTestEnvironment(1, 6);
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 8641f50..7d22e80 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -23,7 +23,9 @@ import
org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
import
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
@@ -36,6 +38,10 @@ import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.
*/
public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
+ // Defines the Semantic.
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
+
// Defines TestEnvironment.
@TestEnv
FlinkContainerWithPulsarEnvironment flink = new
FlinkContainerWithPulsarEnvironment(1, 6);
diff --git
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 4351147..d14d8f9 100644
---
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -23,6 +23,8 @@ import
org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
import
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
@@ -36,6 +38,10 @@ import static
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.
*/
public class PulsarSourceUnorderedE2ECase extends
UnorderedSourceTestSuiteBase<String> {
+ // Defines the Semantic.
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
+
// Defines TestEnvironment.
@TestEnv
FlinkContainerWithPulsarEnvironment flink = new
FlinkContainerWithPulsarEnvironment(1, 8);
diff --git
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
index 86f0c15..8ccf344 100644
---
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
+++
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
@@ -30,8 +30,10 @@ import
org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
import
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
@@ -65,6 +67,10 @@ public class AvroBulkFormatITCase extends
SourceTestSuiteBase<RowData> {
private static final RowDataSerializer SERIALIZER = new
RowDataSerializer(ROW_TYPE);
@SuppressWarnings("unused")
+ @TestSemantics
+ CheckpointingMode[] semantics = new CheckpointingMode[]
{CheckpointingMode.EXACTLY_ONCE};
+
+ @SuppressWarnings("unused")
@TestEnv
MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
new file mode 100644
index 0000000..6e067ec
--- /dev/null
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.junit.annotations;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining supported semantic: {@link
+ * org.apache.flink.streaming.api.CheckpointingMode}.
+ *
+ * <p>Only one field can be annotated in test class.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface TestSemantics {}
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
index 072abc2..beb945d 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
@@ -25,6 +25,8 @@ import
org.apache.flink.connector.testframe.external.ExternalContextFactory;
import org.apache.flink.connector.testframe.junit.annotations.TestContext;
import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
import
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.junit.jupiter.api.extension.AfterAllCallback;
import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -66,6 +68,7 @@ public class ConnectorTestingExtension implements
BeforeAllCallback, AfterAllCal
public static final String TEST_ENV_STORE_KEY = "testEnvironment";
public static final String EXTERNAL_SYSTEM_STORE_KEY = "externalSystem";
public static final String EXTERNAL_CONTEXT_FACTORIES_STORE_KEY =
"externalContext";
+ public static final String SUPPORTED_SEMANTIC_STORE_KEY =
"supportedSemantic";
private TestEnvironment testEnvironment;
private TestResource externalSystem;
@@ -93,6 +96,16 @@ public class ConnectorTestingExtension implements
BeforeAllCallback, AfterAllCal
.put(EXTERNAL_SYSTEM_STORE_KEY, externalSystem);
}
+ // Store supported semantic
+ final List<CheckpointingMode[]> semantics =
+ AnnotationSupport.findAnnotatedFieldValues(
+ context.getRequiredTestInstance(),
+ TestSemantics.class,
+ CheckpointingMode[].class);
+ checkExactlyOneAnnotatedField(semantics, TestSemantics.class);
+ context.getStore(TEST_RESOURCE_NAMESPACE)
+ .put(SUPPORTED_SEMANTIC_STORE_KEY, semantics.get(0));
+
// Search external context factories
final List<ExternalContextFactory> externalContextFactories =
AnnotationSupport.findAnnotatedFieldValues(
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
index faf541d..b320928 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
@@ -23,6 +23,7 @@ import
org.apache.flink.connector.testframe.environment.ClusterControllable;
import org.apache.flink.connector.testframe.environment.TestEnvironment;
import org.apache.flink.connector.testframe.external.ExternalContext;
import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+import org.apache.flink.streaming.api.CheckpointingMode;
import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
import org.junit.jupiter.api.extension.Extension;
@@ -34,10 +35,12 @@ import
org.junit.jupiter.api.extension.TestTemplateInvocationContext;
import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
import java.util.Arrays;
+import java.util.LinkedList;
import java.util.List;
import java.util.stream.Stream;
import static
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.EXTERNAL_CONTEXT_FACTORIES_STORE_KEY;
+import static
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.SUPPORTED_SEMANTIC_STORE_KEY;
import static
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.TEST_ENV_STORE_KEY;
import static
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.TEST_RESOURCE_NAMESPACE;
@@ -77,13 +80,27 @@ public class TestCaseInvocationContextProvider implements
TestTemplateInvocation
context.getStore(TEST_RESOURCE_NAMESPACE)
.get(EXTERNAL_CONTEXT_FACTORIES_STORE_KEY);
+ // Fetch supported semantic from store
+ CheckpointingMode[] semantics =
+ (CheckpointingMode[])
+
context.getStore(TEST_RESOURCE_NAMESPACE).get(SUPPORTED_SEMANTIC_STORE_KEY);
+
// Create an invocation context for each external context factory
return externalContextFactories.stream()
- .map(
- factory ->
- new TestResourceProvidingInvocationContext(
- testEnv,
-
factory.createExternalContext(context.getDisplayName())));
+ .flatMap(
+ factory -> {
+ List<TestResourceProvidingInvocationContext>
result =
+ new LinkedList<>();
+ for (CheckpointingMode semantic : semantics) {
+ result.add(
+ new
TestResourceProvidingInvocationContext(
+ testEnv,
+ factory.createExternalContext(
+
context.getDisplayName()),
+ semantic));
+ }
+ return result.stream();
+ });
}
/**
@@ -94,18 +111,22 @@ public class TestCaseInvocationContextProvider implements
TestTemplateInvocation
private final TestEnvironment testEnvironment;
private final ExternalContext externalContext;
+ private final CheckpointingMode semantic;
public TestResourceProvidingInvocationContext(
- TestEnvironment testEnvironment, ExternalContext
externalContext) {
+ TestEnvironment testEnvironment,
+ ExternalContext externalContext,
+ CheckpointingMode semantic) {
this.testEnvironment = testEnvironment;
this.externalContext = externalContext;
+ this.semantic = semantic;
}
@Override
public String getDisplayName(int invocationIndex) {
return String.format(
- "TestEnvironment: [%s], ExternalContext: [%s]",
- testEnvironment, externalContext);
+ "TestEnvironment: [%s], ExternalContext: [%s], Semantic:
[%s]",
+ testEnvironment, externalContext, semantic);
}
@Override
@@ -115,11 +136,36 @@ public class TestCaseInvocationContextProvider implements
TestTemplateInvocation
new TestEnvironmentResolver(testEnvironment),
new ExternalContextProvider(externalContext),
new ClusterControllableProvider(testEnvironment),
+ new SemanticResolver(semantic),
// Extension for closing external context
(AfterTestExecutionCallback) ignore ->
externalContext.close());
}
}
+ private static class SemanticResolver implements ParameterResolver {
+
+ private final CheckpointingMode semantic;
+
+ private SemanticResolver(CheckpointingMode semantic) {
+ this.semantic = semantic;
+ }
+
+ @Override
+ public boolean supportsParameter(
+ ParameterContext parameterContext, ExtensionContext
extensionContext)
+ throws ParameterResolutionException {
+ return isAssignableFromParameterType(
+ CheckpointingMode.class,
parameterContext.getParameter().getType());
+ }
+
+ @Override
+ public Object resolveParameter(
+ ParameterContext parameterContext, ExtensionContext
extensionContext)
+ throws ParameterResolutionException {
+ return this.semantic;
+ }
+ }
+
private static class TestEnvironmentResolver implements ParameterResolver {
private final TestEnvironment testEnvironment;
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 3231fb1..5de4c5b 100644
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -33,9 +33,8 @@ import
org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
import
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
import
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
import
org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
-import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -48,7 +47,6 @@ import
org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLoggerExtension;
-import org.hamcrest.MatcherAssert;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.TestTemplate;
@@ -59,11 +57,17 @@ import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadLocalRandom;
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static
org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.assertj.core.api.Assertions.assertThat;
+
/**
* Base class for all test suites.
*
@@ -103,13 +107,15 @@ public abstract class SourceTestSuiteBase<T> {
@TestTemplate
@DisplayName("Test source with single split")
public void testSourceSingleSplit(
- TestEnvironment testEnv, DataStreamSourceExternalContext<T>
externalContext)
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<T> externalContext,
+ CheckpointingMode semantic)
throws Exception {
// Step 1: Preparation
TestingSourceSettings sourceSettings =
TestingSourceSettings.builder()
.setBoundedness(Boundedness.BOUNDED)
- .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+ .setCheckpointingMode(semantic)
.build();
TestEnvironmentSettings envSettings =
TestEnvironmentSettings.builder()
@@ -128,11 +134,11 @@ public abstract class SourceTestSuiteBase<T> {
CollectIteratorBuilder<T> iteratorBuilder = addCollectSink(stream);
JobClient jobClient = submitJob(execEnv, "Source Single Split Test");
- // Step 4: Validate test data
- try (CloseableIterator<T> resultIterator =
iteratorBuilder.build(jobClient)) {
+ // Validate test data
+ try (CollectResultIterator<T> resultIterator =
iteratorBuilder.build(jobClient)) {
+ // Check test result
LOG.info("Checking test results");
- MatcherAssert.assertThat(
- resultIterator,
TestDataMatchers.matchesSplitTestData(testRecords));
+ checkResultWithSemantic(resultIterator,
Arrays.asList(testRecords), semantic, null);
}
}
@@ -151,13 +157,15 @@ public abstract class SourceTestSuiteBase<T> {
@TestTemplate
@DisplayName("Test source with multiple splits")
public void testMultipleSplits(
- TestEnvironment testEnv, DataStreamSourceExternalContext<T>
externalContext)
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<T> externalContext,
+ CheckpointingMode semantic)
throws Exception {
// Step 1: Preparation
TestingSourceSettings sourceSettings =
TestingSourceSettings.builder()
.setBoundedness(Boundedness.BOUNDED)
- .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+ .setCheckpointingMode(semantic)
.build();
TestEnvironmentSettings envOptions =
TestEnvironmentSettings.builder()
@@ -184,9 +192,7 @@ public abstract class SourceTestSuiteBase<T> {
try (CloseableIterator<T> resultIterator =
iteratorBuilder.build(jobClient)) {
// Check test result
LOG.info("Checking test results");
- MatcherAssert.assertThat(
- resultIterator,
-
TestDataMatchers.matchesMultipleSplitTestData(testRecordsLists));
+ checkResultWithSemantic(resultIterator, testRecordsLists,
semantic, null);
}
}
@@ -207,13 +213,15 @@ public abstract class SourceTestSuiteBase<T> {
@TestTemplate
@DisplayName("Test source with at least one idle parallelism")
public void testIdleReader(
- TestEnvironment testEnv, DataStreamSourceExternalContext<T>
externalContext)
+ TestEnvironment testEnv,
+ DataStreamSourceExternalContext<T> externalContext,
+ CheckpointingMode semantic)
throws Exception {
// Step 1: Preparation
TestingSourceSettings sourceSettings =
TestingSourceSettings.builder()
.setBoundedness(Boundedness.BOUNDED)
- .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+ .setCheckpointingMode(semantic)
.build();
TestEnvironmentSettings envOptions =
TestEnvironmentSettings.builder()
@@ -239,9 +247,7 @@ public abstract class SourceTestSuiteBase<T> {
// Step 4: Validate test data
try (CloseableIterator<T> resultIterator =
iteratorBuilder.build(jobClient)) {
LOG.info("Checking test results");
- MatcherAssert.assertThat(
- resultIterator,
-
TestDataMatchers.matchesMultipleSplitTestData(testRecordsLists));
+ checkResultWithSemantic(resultIterator, testRecordsLists,
semantic, null);
}
}
@@ -263,13 +269,14 @@ public abstract class SourceTestSuiteBase<T> {
public void testTaskManagerFailure(
TestEnvironment testEnv,
DataStreamSourceExternalContext<T> externalContext,
- ClusterControllable controller)
+ ClusterControllable controller,
+ CheckpointingMode semantic)
throws Exception {
// Step 1: Preparation
TestingSourceSettings sourceSettings =
TestingSourceSettings.builder()
.setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
- .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+ .setCheckpointingMode(semantic)
.build();
TestEnvironmentSettings envOptions =
TestEnvironmentSettings.builder()
@@ -302,17 +309,18 @@ public abstract class SourceTestSuiteBase<T> {
// Step 4: Validate records before killing TaskManagers
CloseableIterator<T> iterator = iteratorBuilder.build(jobClient);
LOG.info("Checking records before killing TaskManagers");
- MatcherAssert.assertThat(
+ checkResultWithSemantic(
iterator,
- TestDataMatchers.matchesSplitTestData(
- testRecordsBeforeFailure,
testRecordsBeforeFailure.size()));
+ Arrays.asList(testRecordsBeforeFailure),
+ semantic,
+ testRecordsBeforeFailure.size());
// Step 5: Trigger TaskManager failover
LOG.info("Trigger TaskManager failover");
controller.triggerTaskManagerFailover(jobClient, () -> {});
LOG.info("Waiting for job recovering from failure");
- CommonTestUtils.waitForJobStatus(
+ waitForJobStatus(
jobClient,
Collections.singletonList(JobStatus.RUNNING),
Deadline.fromNow(Duration.ofSeconds(30)));
@@ -329,14 +337,15 @@ public abstract class SourceTestSuiteBase<T> {
// Step 7: Validate test result
LOG.info("Checking records after job failover");
- MatcherAssert.assertThat(
+ checkResultWithSemantic(
iterator,
- TestDataMatchers.matchesSplitTestData(
- testRecordsAfterFailure,
testRecordsAfterFailure.size()));
+ Arrays.asList(testRecordsAfterFailure),
+ semantic,
+ testRecordsAfterFailure.size());
// Step 8: Clean up
- CommonTestUtils.terminateJob(jobClient, Duration.ofSeconds(30));
- CommonTestUtils.waitForJobStatus(
+ terminateJob(jobClient, Duration.ofSeconds(30));
+ waitForJobStatus(
jobClient,
Collections.singletonList(JobStatus.CANCELED),
Deadline.fromNow(Duration.ofSeconds(30)));
@@ -400,6 +409,37 @@ public abstract class SourceTestSuiteBase<T> {
stream.getExecutionEnvironment().getCheckpointConfig());
}
+ /**
+ * Compare the test data with the result.
+ *
+ * <p>If the source is bounded, limit should be null.
+ *
+ * @param resultIterator the data read from the job
+ * @param testData the test data
+ * @param semantic the supported semantic, see {@link CheckpointingMode}
+ * @param limit expected number of the data to read from the job
+ */
+ private void checkResultWithSemantic(
+ CloseableIterator<T> resultIterator,
+ List<List<T>> testData,
+ CheckpointingMode semantic,
+ Integer limit) {
+ if (limit != null) {
+ assertThat(
+ CompletableFuture.supplyAsync(
+ () -> {
+
CollectIteratorAssertions.assertThat(resultIterator)
+ .withNumRecordsLimit(limit)
+
.matchesRecordsFromSource(testData, semantic);
+ return true;
+ }))
+ .succeedsWithin(Duration.ofSeconds(30));
+ } else {
+ CollectIteratorAssertions.assertThat(resultIterator)
+ .matchesRecordsFromSource(testData, semantic);
+ }
+ }
+
/** Builder class for constructing {@link CollectResultIterator} of
collect sink. */
protected static class CollectIteratorBuilder<T> {
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java
new file mode 100644
index 0000000..5a291bd
--- /dev/null
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.assertj.core.api.AbstractAssert;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This assertion used to compare records in the collect iterator to the
target test data with
+ * different semantic(AT_LEAST_ONCE, EXACTLY_ONCE).
+ *
+ * @param <T> The type of records in the test data and collect iterator
+ */
+public class CollectIteratorAssert<T>
+ extends AbstractAssert<CollectIteratorAssert<T>, Iterator<T>> {
+
+ private final Iterator<T> collectorIterator;
+ private final List<RecordsFromSplit<T>> recordsFromSplits = new
ArrayList<>();
+ private int totalNumRecords;
+ private Integer limit = null;
+
+ protected CollectIteratorAssert(Iterator<T> collectorIterator) {
+ super(collectorIterator, CollectIteratorAssert.class);
+ this.collectorIterator = collectorIterator;
+ }
+
+ public CollectIteratorAssert<T> withNumRecordsLimit(int limit) {
+ this.limit = limit;
+ return this;
+ }
+
+ public void matchesRecordsFromSource(
+ List<List<T>> recordsBySplitsFromSource, CheckpointingMode
semantic) {
+ for (List<T> recordsFromSplit : recordsBySplitsFromSource) {
+ recordsFromSplits.add(new RecordsFromSplit<>(recordsFromSplit));
+ totalNumRecords += recordsFromSplit.size();
+ }
+
+ if (limit != null && limit > totalNumRecords) {
+ throw new IllegalArgumentException(
+ "Limit validation size should be less than total number of
records from source");
+ }
+
+ switch (semantic) {
+ case AT_LEAST_ONCE:
+ compareWithAtLeastOnceSemantic(collectorIterator,
recordsFromSplits);
+ break;
+ case EXACTLY_ONCE:
+ compareWithExactlyOnceSemantic(collectorIterator,
recordsFromSplits);
+ break;
+ default:
+ throw new IllegalArgumentException(
+ String.format("Unrecognized semantic \"%s\"",
semantic));
+ }
+ }
+
+ private void compareWithAtLeastOnceSemantic(
+ Iterator<T> resultIterator, List<RecordsFromSplit<T>>
recordsFromSplits) {
+ List<T> duplicateRead = new LinkedList<>();
+
+ int recordCounter = 0;
+ while (resultIterator.hasNext()) {
+ final T record = resultIterator.next();
+ if (!matchThenNext(record)) {
+ duplicateRead.add(record);
+ } else {
+ recordCounter++;
+ }
+
+ if (limit != null && recordCounter >= limit) {
+ break;
+ }
+ }
+ if (limit == null && !hasReachedEnd()) {
+ failWithMessage(
+ generateMismatchDescription(
+ String.format(
+ "Expected to have at least %d records in
result, but only received %d records",
+ recordsFromSplits.stream()
+ .mapToInt(
+ recordsFromSplit ->
+
recordsFromSplit.records.size())
+ .sum(),
+ recordCounter),
+ resultIterator));
+ } else {
+ confirmDuplicateRead(duplicateRead);
+ }
+ }
+
+ private void compareWithExactlyOnceSemantic(
+ Iterator<T> resultIterator, List<RecordsFromSplit<T>>
recordsFromSplits) {
+ int recordCounter = 0;
+ while (resultIterator.hasNext()) {
+ final T record = resultIterator.next();
+ if (!matchThenNext(record)) {
+ if (recordCounter >= totalNumRecords) {
+ failWithMessage(
+ generateMismatchDescription(
+ String.format(
+ "Expected to have exactly %d
records in result, but received more records",
+ recordsFromSplits.stream()
+ .mapToInt(
+ recordsFromSplit ->
+
recordsFromSplit.records.size())
+ .sum()),
+ resultIterator));
+ } else {
+ failWithMessage(
+ generateMismatchDescription(
+ String.format(
+ "Unexpected record '%s' at
position %d",
+ record, recordCounter),
+ resultIterator));
+ }
+ }
+ recordCounter++;
+ if (limit != null && recordCounter >= limit) {
+ break;
+ }
+ }
+ if (limit == null && !hasReachedEnd()) {
+ failWithMessage(
+ generateMismatchDescription(
+ String.format(
+ "Expected to have exactly %d records in
result, but only received %d records",
+ recordsFromSplits.stream()
+ .mapToInt(
+ recordsFromSplit ->
+
recordsFromSplit.records.size())
+ .sum(),
+ recordCounter),
+ resultIterator));
+ }
+ }
+
+ private void confirmDuplicateRead(List<T> duplicateRead) {
+ for (T record : duplicateRead) {
+ boolean found = false;
+ for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+ if (recordsFromSplit.records.contains(record)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ failWithMessage(String.format("Unexpected duplicate record
'%s'", record));
+ }
+ }
+ }
+
+ /**
+ * Check if any pointing data is identical to the record from the stream,
and move the pointer
+ * to next record if matched.
+ *
+ * @param record Record from stream
+ */
+ private boolean matchThenNext(T record) {
+ for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+ if (!recordsFromSplit.hasNext()) {
+ continue;
+ }
+ if (record.equals(recordsFromSplit.current())) {
+ recordsFromSplit.forward();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Whether all pointers have reached the end of lists.
+ *
+ * @return True if all pointers have reached the end.
+ */
+ private boolean hasReachedEnd() {
+ for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+ if (recordsFromSplit.hasNext()) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private String generateMismatchDescription(String reason, Iterator<T>
resultIterator) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(reason).append("\n");
+ sb.append("Current progress of multiple split test data
validation:\n");
+ int splitCounter = 0;
+ for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+ sb.append(
+ String.format(
+ "Split %d (%d/%d): \n",
+ splitCounter++,
+ recordsFromSplit.offset,
+ recordsFromSplit.records.size()));
+ for (int recordIndex = 0;
+ recordIndex < recordsFromSplit.records.size();
+ recordIndex++) {
+ sb.append(recordsFromSplit.records.get(recordIndex));
+ if (recordIndex == recordsFromSplit.offset) {
+ sb.append("\t<----");
+ }
+ sb.append("\n");
+ }
+ }
+ if (resultIterator.hasNext()) {
+ sb.append("Remaining received elements after the unexpected one:
\n");
+ while (resultIterator.hasNext()) {
+ sb.append(resultIterator.next()).append("\n");
+ }
+ }
+ return sb.toString();
+ }
+
+ private static class RecordsFromSplit<T> {
+ private int offset = 0;
+ private final List<T> records;
+
+ public RecordsFromSplit(List<T> records) {
+ this.records = records;
+ }
+
+ public T current() {
+ if (!hasNext()) {
+ return null;
+ }
+ return records.get(offset);
+ }
+
+ public void forward() {
+ ++offset;
+ }
+
+ public boolean hasNext() {
+ return offset < records.size();
+ }
+ }
+}
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
new file mode 100644
index 0000000..8bbb529
--- /dev/null
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import java.util.Iterator;
+
+/**
+ * Entry point for assertion methods for {@link CollectIteratorAssert}. Each
method in this class is
+ * a static factory.
+ */
+public class CollectIteratorAssertions {
+ public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual) {
+ return new CollectIteratorAssert<>(actual);
+ }
+}
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
deleted file mode 100644
index 2420fde..0000000
---
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.testframe.utils;
-
-import org.apache.flink.annotation.Internal;
-
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/** Matchers for validating test data. */
-@Internal
-public class TestDataMatchers {
-
- // ---------------------------- Matcher Builders
----------------------------------
- public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
- List<List<T>> testRecordsLists) {
- return new MultipleSplitDataMatcher<>(testRecordsLists);
- }
-
- public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T>
testData) {
- return new SingleSplitDataMatcher<>(testData);
- }
-
- public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T>
testData, int limit) {
- return new SingleSplitDataMatcher<>(testData, limit);
- }
-
- // ---------------------------- Matcher Definitions
--------------------------------
-
- /**
- * Matcher for validating test data in a single split.
- *
- * @param <T> Type of validating record
- */
- public static class SingleSplitDataMatcher<T> extends
TypeSafeDiagnosingMatcher<Iterator<T>> {
- private static final int UNSET = -1;
-
- private final List<T> testData;
- private final int limit;
-
- private String mismatchDescription = null;
-
- public SingleSplitDataMatcher(List<T> testData) {
- this.testData = testData;
- this.limit = UNSET;
- }
-
- public SingleSplitDataMatcher(List<T> testData, int limit) {
- if (limit > testData.size()) {
- throw new IllegalArgumentException(
- "Limit validation size should be less than number of
test records");
- }
- this.testData = testData;
- this.limit = limit;
- }
-
- @Override
- protected boolean matchesSafely(Iterator<T> resultIterator,
Description description) {
- if (mismatchDescription != null) {
- description.appendText(mismatchDescription);
- return false;
- }
-
- boolean dataMismatch = false;
- boolean sizeMismatch = false;
- String sizeMismatchDescription = "";
- String dataMismatchDescription = "";
- int recordCounter = 0;
- for (T testRecord : testData) {
- if (!resultIterator.hasNext()) {
- sizeMismatchDescription =
- String.format(
- "Expected to have %d records in result,
but only received %d records",
- limit == UNSET ? testData.size() : limit,
recordCounter);
- sizeMismatch = true;
- break;
- }
- T resultRecord = resultIterator.next();
- if (!testRecord.equals(resultRecord)) {
- dataMismatchDescription =
- String.format(
- "Mismatched record at position %d:
Expected '%s' but was '%s'",
- recordCounter, testRecord, resultRecord);
- dataMismatch = true;
- }
- recordCounter++;
- if (limit != UNSET && recordCounter >= limit) {
- break;
- }
- }
-
- if (limit == UNSET && resultIterator.hasNext()) {
- sizeMismatchDescription =
- String.format(
- "Expected to have exactly %d records in
result, "
- + "but result iterator hasn't reached
the end",
- testData.size());
- sizeMismatch = true;
- }
-
- if (dataMismatch && sizeMismatch) {
- mismatchDescription = sizeMismatchDescription + " And " +
dataMismatchDescription;
- return false;
- } else if (dataMismatch) {
- mismatchDescription = dataMismatchDescription;
- return false;
- } else if (sizeMismatch) {
- mismatchDescription = sizeMismatchDescription;
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText(
- "Records consumed by Flink should be identical to test
data "
- + "and preserve the order in split");
- }
- }
-
- /**
- * Matcher for validating test data from multiple splits.
- *
- * <p>Each list has a pointer (iterator) pointing to current checking
record. When a record is
- * received in the stream, it will be compared to all current pointing
records in lists, and the
- * pointer to the identical record will move forward.
- *
- * <p>If the stream preserves the correctness and order of records in all
splits, all pointers
- * should reach the end of the list finally.
- *
- * @param <T> Type of validating record
- */
- public static class MultipleSplitDataMatcher<T> extends
TypeSafeDiagnosingMatcher<Iterator<T>> {
-
- List<TestRecords<T>> testRecordsLists = new ArrayList<>();
-
- private String mismatchDescription = null;
-
- public MultipleSplitDataMatcher(List<List<T>> testData) {
- for (List<T> testRecordsList : testData) {
- this.testRecordsLists.add(new TestRecords<>(testRecordsList));
- }
- }
-
- @Override
- protected boolean matchesSafely(Iterator<T> resultIterator,
Description description) {
- if (mismatchDescription != null) {
- description.appendText(mismatchDescription);
- return false;
- }
-
- int recordCounter = 0;
- while (resultIterator.hasNext()) {
- final T record = resultIterator.next();
- if (!matchThenNext(record)) {
- this.mismatchDescription =
- generateMismatchDescription(
- String.format(
- "Unexpected record '%s' at
position %d",
- record, recordCounter),
- resultIterator);
- return false;
- }
- recordCounter++;
- }
-
- if (!hasReachedEnd()) {
- this.mismatchDescription =
- generateMismatchDescription(
- String.format(
- "Expected to have exactly %d records
in result, but only received %d records",
- testRecordsLists.stream()
- .mapToInt(list ->
list.records.size())
- .sum(),
- recordCounter),
- resultIterator);
- return false;
- } else {
- return true;
- }
- }
-
- @Override
- public void describeTo(Description description) {
- description.appendText(
- "Records consumed by Flink should be identical to test
data "
- + "and preserve the order in multiple splits");
- }
-
- /**
- * Check if any pointing data is identical to the record from the
stream, and move the
- * pointer to next record if matched. Otherwise throws an exception.
- *
- * @param record Record from stream
- */
- private boolean matchThenNext(T record) {
- for (TestRecords<T> testRecordsList : testRecordsLists) {
- if (!testRecordsList.hasNext()) {
- continue;
- }
- if (record.equals(testRecordsList.current())) {
- testRecordsList.forward();
- return true;
- }
- }
- return false;
- }
-
- /**
- * Whether all pointers have reached the end of lists.
- *
- * @return True if all pointers have reached the end.
- */
- private boolean hasReachedEnd() {
- for (TestRecords<T> testRecordsList : testRecordsLists) {
- if (testRecordsList.hasNext()) {
- return false;
- }
- }
- return true;
- }
-
- String generateMismatchDescription(String reason, Iterator<T>
resultIterator) {
- final StringBuilder sb = new StringBuilder();
- sb.append(reason).append("\n");
- sb.append("Current progress of multiple split test data
validation:\n");
- int splitCounter = 0;
- for (TestRecords<T> testRecordsList : testRecordsLists) {
- sb.append(
- String.format(
- "Split %d (%d/%d): \n",
- splitCounter++,
- testRecordsList.offset,
- testRecordsList.records.size()));
- for (int recordIndex = 0;
- recordIndex < testRecordsList.records.size();
- recordIndex++) {
- sb.append(testRecordsList.records.get(recordIndex));
- if (recordIndex == testRecordsList.offset) {
- sb.append("\t<----");
- }
- sb.append("\n");
- }
- }
- if (resultIterator.hasNext()) {
- sb.append("Remaining received elements after the unexpected
one: \n");
- while (resultIterator.hasNext()) {
- sb.append(resultIterator.next()).append("\n");
- }
- }
- return sb.toString();
- }
-
- private static class TestRecords<T> {
- private int offset = 0;
- private final List<T> records;
-
- public TestRecords(List<T> records) {
- this.records = records;
- }
-
- public T current() {
- if (!hasNext()) {
- return null;
- }
- return records.get(offset);
- }
-
- public void forward() {
- ++offset;
- }
-
- public boolean hasNext() {
- return offset < records.size();
- }
- }
- }
-}
diff --git
a/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java
b/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java
new file mode 100644
index 0000000..7503e8e4
--- /dev/null
+++
b/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+
+/** Unit tests for {@link CollectIteratorAssertTest}. */
+public class CollectIteratorAssertTest {
+ @Nested
+ class MultipleSplitDataMatcherTest {
+ private final List<String> splitA = Arrays.asList("alpha", "beta",
"gamma");
+ private final List<String> splitB = Arrays.asList("one", "two",
"three");
+ private final List<String> splitC = Arrays.asList("1", "2", "3");
+ private final List<List<String>> testDataCollection =
Arrays.asList(splitA, splitB, splitC);
+
+ @Test
+ public void testDataMatcherWithExactlyOnceSemantic() {
+ final List<String> result = unionLists(splitA, splitB, splitC);
+ assertThat(result.iterator())
+ .matchesRecordsFromSource(testDataCollection,
CheckpointingMode.EXACTLY_ONCE);
+ }
+
+ @Test
+ public void testDataMatcherWithAtLeastOnceSemantic() {
+ final List<String> result = unionLists(splitA, splitB, splitC,
splitA);
+ assertThat(result.iterator())
+ .matchesRecordsFromSource(testDataCollection,
CheckpointingMode.AT_LEAST_ONCE);
+ }
+
+ @Test
+ public void testResultLessThanExpected() {
+ final ArrayList<String> splitATestDataWithoutLast = new
ArrayList<>(splitA);
+ splitATestDataWithoutLast.remove(splitA.size() - 1);
+ final List<String> result = unionLists(splitATestDataWithoutLast,
splitB, splitC);
+ Assertions.assertThatThrownBy(
+ () ->
+ assertThat(result.iterator())
+ .matchesRecordsFromSource(
+ testDataCollection,
+
CheckpointingMode.EXACTLY_ONCE))
+ .hasMessageContaining(
+ "Expected to have exactly 9 records in result, but
only received 8 records\n"
+ + "Current progress of multiple split test
data validation:\n"
+ + "Split 0 (2/3): \n"
+ + "alpha\n"
+ + "beta\n"
+ + "gamma\t<----\n"
+ + "Split 1 (3/3): \n"
+ + "one\n"
+ + "two\n"
+ + "three\n"
+ + "Split 2 (3/3): \n"
+ + "1\n"
+ + "2\n"
+ + "3\n");
+ }
+
+ @Test
+ public void testResultMoreThanExpected() {
+ final List<String> result = unionLists(splitA, splitB, splitC);
+ result.add("delta");
+ Assertions.assertThatThrownBy(
+ () ->
+ assertThat(result.iterator())
+ .matchesRecordsFromSource(
+ testDataCollection,
+
CheckpointingMode.EXACTLY_ONCE))
+ .hasMessageContaining(
+ "Expected to have exactly 9 records in result, but
received more records\n"
+ + "Current progress of multiple split test
data validation:\n"
+ + "Split 0 (3/3): \n"
+ + "alpha\n"
+ + "beta\n"
+ + "gamma\n"
+ + "Split 1 (3/3): \n"
+ + "one\n"
+ + "two\n"
+ + "three\n"
+ + "Split 2 (3/3): \n"
+ + "1\n"
+ + "2\n"
+ + "3\n");
+ }
+
+ @Test
+ public void testOutOfOrder() {
+ List<String> reverted = new ArrayList<>(splitC);
+ Collections.reverse(reverted);
+ final List<String> result = unionLists(splitA, splitB, reverted);
+ Assertions.assertThatThrownBy(
+ () ->
+ assertThat(result.iterator())
+ .matchesRecordsFromSource(
+ testDataCollection,
+
CheckpointingMode.EXACTLY_ONCE))
+ .hasMessageContaining(
+ "Unexpected record '3' at position 6\n"
+ + "Current progress of multiple split test
data validation:\n"
+ + "Split 0 (3/3): \n"
+ + "alpha\n"
+ + "beta\n"
+ + "gamma\n"
+ + "Split 1 (3/3): \n"
+ + "one\n"
+ + "two\n"
+ + "three\n"
+ + "Split 2 (0/3): \n"
+ + "1\t<----\n"
+ + "2\n"
+ + "3\n"
+ + "Remaining received elements after the
unexpected one: \n"
+ + "2\n"
+ + "1\n");
+ }
+ }
+
+ @SafeVarargs
+ private final <T> List<T> unionLists(List<T>... lists) {
+ return
Stream.of(lists).flatMap(Collection::stream).collect(Collectors.toList());
+ }
+}