This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1e17c9  [FLINK-25840][tests] Add semantics test support for connector 
test framework
f1e17c9 is described below

commit f1e17c9f39b876537ae2065c6331baa5302bbddf
Author: Hang Ruan <[email protected]>
AuthorDate: Thu Jan 27 16:57:27 2022 +0800

    [FLINK-25840][tests] Add semantics test support for connector test framework
    
    This closes #18547
---
 .../connector/kafka/source/KafkaSourceITCase.java  |   4 +
 .../pulsar/source/PulsarSourceITCase.java          |   5 +
 .../flink/tests/util/kafka/KafkaSourceE2ECase.java |   5 +
 .../util/pulsar/PulsarSourceOrderedE2ECase.java    |   6 +
 .../util/pulsar/PulsarSourceUnorderedE2ECase.java  |   6 +
 .../flink/formats/avro/AvroBulkFormatITCase.java   |   6 +
 .../testframe/junit/annotations/TestSemantics.java |  37 +++
 .../extensions/ConnectorTestingExtension.java      |  13 +
 .../TestCaseInvocationContextProvider.java         |  62 ++++-
 .../testframe/testsuites/SourceTestSuiteBase.java  | 100 ++++---
 .../testframe/utils/CollectIteratorAssert.java     | 261 ++++++++++++++++++
 .../testframe/utils/CollectIteratorAssertions.java |  31 +++
 .../testframe/utils/TestDataMatchers.java          | 300 ---------------------
 .../testframe/utils/CollectIteratorAssertTest.java | 151 +++++++++++
 14 files changed, 649 insertions(+), 338 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
index c215077..71108d2 100644
--- 
a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
+++ 
b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/source/KafkaSourceITCase.java
@@ -33,7 +33,9 @@ import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExterna
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
@@ -259,6 +261,8 @@ public class KafkaSourceITCase {
     /** Integration test based on connector testing framework. */
     @Nested
     class IntegrationTests extends SourceTestSuiteBase<String> {
+        @TestSemantics
+        CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
 
         // Defines test environment on Flink MiniCluster
         @SuppressWarnings("unused")
diff --git 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
index 5e5dca2..b28e449 100644
--- 
a/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
+++ 
b/flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
@@ -27,7 +27,9 @@ import 
org.apache.flink.connector.testframe.environment.MiniClusterTestEnvironme
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
 
 /** Unite test class for {@link PulsarSource}. */
 @SuppressWarnings("unused")
@@ -40,6 +42,9 @@ class PulsarSourceITCase extends SourceTestSuiteBase<String> {
     @TestExternalSystem
     PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
 
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
     // Defines an external context Factories,
     // so test cases will be invoked using this external contexts.
     @TestContext
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
index 2b6e8d5..eae1eda 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/KafkaSourceE2ECase.java
@@ -23,7 +23,9 @@ import 
org.apache.flink.connector.testframe.external.DefaultContainerizedExterna
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.flink.FlinkContainerTestEnvironment;
 import org.apache.flink.util.DockerImageVersions;
@@ -40,6 +42,9 @@ import static 
org.apache.flink.connector.kafka.testutils.KafkaSourceExternalCont
 public class KafkaSourceE2ECase extends SourceTestSuiteBase<String> {
     private static final String KAFKA_HOSTNAME = "kafka";
 
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
     // Defines TestEnvironment
     @TestEnv FlinkContainerTestEnvironment flink = new 
FlinkContainerTestEnvironment(1, 6);
 
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
index 8641f50..7d22e80 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceOrderedE2ECase.java
@@ -23,7 +23,9 @@ import 
org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.ExclusiveSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.FailoverSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
@@ -36,6 +38,10 @@ import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.
  */
 public class PulsarSourceOrderedE2ECase extends SourceTestSuiteBase<String> {
 
+    // Defines the Semantic.
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
     // Defines TestEnvironment.
     @TestEnv
     FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 6);
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
index 4351147..d14d8f9 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-pulsar/src/test/java/org/apache/flink/tests/util/pulsar/PulsarSourceUnorderedE2ECase.java
@@ -23,6 +23,8 @@ import 
org.apache.flink.connector.pulsar.testutils.PulsarTestEnvironment;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.tests.util.pulsar.cases.KeySharedSubscriptionContext;
 import org.apache.flink.tests.util.pulsar.cases.SharedSubscriptionContext;
 import 
org.apache.flink.tests.util.pulsar.common.FlinkContainerWithPulsarEnvironment;
@@ -36,6 +38,10 @@ import static 
org.apache.flink.connector.pulsar.testutils.runtime.PulsarRuntime.
  */
 public class PulsarSourceUnorderedE2ECase extends 
UnorderedSourceTestSuiteBase<String> {
 
+    // Defines the Semantic.
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
     // Defines TestEnvironment.
     @TestEnv
     FlinkContainerWithPulsarEnvironment flink = new 
FlinkContainerWithPulsarEnvironment(1, 8);
diff --git 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
index 86f0c15..8ccf344 100644
--- 
a/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
+++ 
b/flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
@@ -30,8 +30,10 @@ import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
 import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
 import org.apache.flink.connector.testframe.testsuites.SourceTestSuiteBase;
 import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
+import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -65,6 +67,10 @@ public class AvroBulkFormatITCase extends 
SourceTestSuiteBase<RowData> {
     private static final RowDataSerializer SERIALIZER = new 
RowDataSerializer(ROW_TYPE);
 
     @SuppressWarnings("unused")
+    @TestSemantics
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};
+
+    @SuppressWarnings("unused")
     @TestEnv
     MiniClusterTestEnvironment flink = new MiniClusterTestEnvironment();
 
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
new file mode 100644
index 0000000..6e067ec
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/TestSemantics.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.junit.annotations;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining supported semantic: {@link
+ * org.apache.flink.streaming.api.CheckpointingMode}.
+ *
+ * <p>Only one field can be annotated in test class.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface TestSemantics {}
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
index 072abc2..beb945d 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/ConnectorTestingExtension.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.connector.testframe.external.ExternalContextFactory;
 import org.apache.flink.connector.testframe.junit.annotations.TestContext;
 import org.apache.flink.connector.testframe.junit.annotations.TestEnv;
 import 
org.apache.flink.connector.testframe.junit.annotations.TestExternalSystem;
+import org.apache.flink.connector.testframe.junit.annotations.TestSemantics;
+import org.apache.flink.streaming.api.CheckpointingMode;
 
 import org.junit.jupiter.api.extension.AfterAllCallback;
 import org.junit.jupiter.api.extension.BeforeAllCallback;
@@ -66,6 +68,7 @@ public class ConnectorTestingExtension implements 
BeforeAllCallback, AfterAllCal
     public static final String TEST_ENV_STORE_KEY = "testEnvironment";
     public static final String EXTERNAL_SYSTEM_STORE_KEY = "externalSystem";
     public static final String EXTERNAL_CONTEXT_FACTORIES_STORE_KEY = 
"externalContext";
+    public static final String SUPPORTED_SEMANTIC_STORE_KEY = 
"supportedSemantic";
 
     private TestEnvironment testEnvironment;
     private TestResource externalSystem;
@@ -93,6 +96,16 @@ public class ConnectorTestingExtension implements 
BeforeAllCallback, AfterAllCal
                     .put(EXTERNAL_SYSTEM_STORE_KEY, externalSystem);
         }
 
+        // Store supported semantic
+        final List<CheckpointingMode[]> semantics =
+                AnnotationSupport.findAnnotatedFieldValues(
+                        context.getRequiredTestInstance(),
+                        TestSemantics.class,
+                        CheckpointingMode[].class);
+        checkExactlyOneAnnotatedField(semantics, TestSemantics.class);
+        context.getStore(TEST_RESOURCE_NAMESPACE)
+                .put(SUPPORTED_SEMANTIC_STORE_KEY, semantics.get(0));
+
         // Search external context factories
         final List<ExternalContextFactory> externalContextFactories =
                 AnnotationSupport.findAnnotatedFieldValues(
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
index faf541d..b320928 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/extensions/TestCaseInvocationContextProvider.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.connector.testframe.environment.ClusterControllable;
 import org.apache.flink.connector.testframe.environment.TestEnvironment;
 import org.apache.flink.connector.testframe.external.ExternalContext;
 import org.apache.flink.connector.testframe.external.ExternalContextFactory;
+import org.apache.flink.streaming.api.CheckpointingMode;
 
 import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
 import org.junit.jupiter.api.extension.Extension;
@@ -34,10 +35,12 @@ import 
org.junit.jupiter.api.extension.TestTemplateInvocationContext;
 import org.junit.jupiter.api.extension.TestTemplateInvocationContextProvider;
 
 import java.util.Arrays;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.stream.Stream;
 
 import static 
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.EXTERNAL_CONTEXT_FACTORIES_STORE_KEY;
+import static 
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.SUPPORTED_SEMANTIC_STORE_KEY;
 import static 
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.TEST_ENV_STORE_KEY;
 import static 
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension.TEST_RESOURCE_NAMESPACE;
 
@@ -77,13 +80,27 @@ public class TestCaseInvocationContextProvider implements 
TestTemplateInvocation
                         context.getStore(TEST_RESOURCE_NAMESPACE)
                                 .get(EXTERNAL_CONTEXT_FACTORIES_STORE_KEY);
 
+        // Fetch supported semantic from store
+        CheckpointingMode[] semantics =
+                (CheckpointingMode[])
+                        
context.getStore(TEST_RESOURCE_NAMESPACE).get(SUPPORTED_SEMANTIC_STORE_KEY);
+
         // Create an invocation context for each external context factory
         return externalContextFactories.stream()
-                .map(
-                        factory ->
-                                new TestResourceProvidingInvocationContext(
-                                        testEnv,
-                                        
factory.createExternalContext(context.getDisplayName())));
+                .flatMap(
+                        factory -> {
+                            List<TestResourceProvidingInvocationContext> 
result =
+                                    new LinkedList<>();
+                            for (CheckpointingMode semantic : semantics) {
+                                result.add(
+                                        new 
TestResourceProvidingInvocationContext(
+                                                testEnv,
+                                                factory.createExternalContext(
+                                                        
context.getDisplayName()),
+                                                semantic));
+                            }
+                            return result.stream();
+                        });
     }
 
     /**
@@ -94,18 +111,22 @@ public class TestCaseInvocationContextProvider implements 
TestTemplateInvocation
 
         private final TestEnvironment testEnvironment;
         private final ExternalContext externalContext;
+        private final CheckpointingMode semantic;
 
         public TestResourceProvidingInvocationContext(
-                TestEnvironment testEnvironment, ExternalContext 
externalContext) {
+                TestEnvironment testEnvironment,
+                ExternalContext externalContext,
+                CheckpointingMode semantic) {
             this.testEnvironment = testEnvironment;
             this.externalContext = externalContext;
+            this.semantic = semantic;
         }
 
         @Override
         public String getDisplayName(int invocationIndex) {
             return String.format(
-                    "TestEnvironment: [%s], ExternalContext: [%s]",
-                    testEnvironment, externalContext);
+                    "TestEnvironment: [%s], ExternalContext: [%s], Semantic: 
[%s]",
+                    testEnvironment, externalContext, semantic);
         }
 
         @Override
@@ -115,11 +136,36 @@ public class TestCaseInvocationContextProvider implements 
TestTemplateInvocation
                     new TestEnvironmentResolver(testEnvironment),
                     new ExternalContextProvider(externalContext),
                     new ClusterControllableProvider(testEnvironment),
+                    new SemanticResolver(semantic),
                     // Extension for closing external context
                     (AfterTestExecutionCallback) ignore -> 
externalContext.close());
         }
     }
 
+    private static class SemanticResolver implements ParameterResolver {
+
+        private final CheckpointingMode semantic;
+
+        private SemanticResolver(CheckpointingMode semantic) {
+            this.semantic = semantic;
+        }
+
+        @Override
+        public boolean supportsParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return isAssignableFromParameterType(
+                    CheckpointingMode.class, 
parameterContext.getParameter().getType());
+        }
+
+        @Override
+        public Object resolveParameter(
+                ParameterContext parameterContext, ExtensionContext 
extensionContext)
+                throws ParameterResolutionException {
+            return this.semantic;
+        }
+    }
+
     private static class TestEnvironmentResolver implements ParameterResolver {
 
         private final TestEnvironment testEnvironment;
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
index 3231fb1..5de4c5b 100644
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
@@ -33,9 +33,8 @@ import 
org.apache.flink.connector.testframe.external.source.DataStreamSourceExte
 import 
org.apache.flink.connector.testframe.external.source.TestingSourceSettings;
 import 
org.apache.flink.connector.testframe.junit.extensions.ConnectorTestingExtension;
 import 
org.apache.flink.connector.testframe.junit.extensions.TestCaseInvocationContextProvider;
-import org.apache.flink.connector.testframe.utils.TestDataMatchers;
+import org.apache.flink.connector.testframe.utils.CollectIteratorAssertions;
 import org.apache.flink.core.execution.JobClient;
-import org.apache.flink.runtime.testutils.CommonTestUtils;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -48,7 +47,6 @@ import 
org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
 import org.apache.flink.util.CloseableIterator;
 import org.apache.flink.util.TestLoggerExtension;
 
-import org.hamcrest.MatcherAssert;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.TestInstance;
 import org.junit.jupiter.api.TestTemplate;
@@ -59,11 +57,17 @@ import org.slf4j.LoggerFactory;
 
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static org.apache.flink.runtime.testutils.CommonTestUtils.terminateJob;
+import static 
org.apache.flink.runtime.testutils.CommonTestUtils.waitForJobStatus;
+import static org.assertj.core.api.Assertions.assertThat;
+
 /**
  * Base class for all test suites.
  *
@@ -103,13 +107,15 @@ public abstract class SourceTestSuiteBase<T> {
     @TestTemplate
     @DisplayName("Test source with single split")
     public void testSourceSingleSplit(
-            TestEnvironment testEnv, DataStreamSourceExternalContext<T> 
externalContext)
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<T> externalContext,
+            CheckpointingMode semantic)
             throws Exception {
         // Step 1: Preparation
         TestingSourceSettings sourceSettings =
                 TestingSourceSettings.builder()
                         .setBoundedness(Boundedness.BOUNDED)
-                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+                        .setCheckpointingMode(semantic)
                         .build();
         TestEnvironmentSettings envSettings =
                 TestEnvironmentSettings.builder()
@@ -128,11 +134,11 @@ public abstract class SourceTestSuiteBase<T> {
         CollectIteratorBuilder<T> iteratorBuilder = addCollectSink(stream);
         JobClient jobClient = submitJob(execEnv, "Source Single Split Test");
 
-        // Step 4: Validate test data
-        try (CloseableIterator<T> resultIterator = 
iteratorBuilder.build(jobClient)) {
+        // Validate test data
+        try (CollectResultIterator<T> resultIterator = 
iteratorBuilder.build(jobClient)) {
+            // Check test result
             LOG.info("Checking test results");
-            MatcherAssert.assertThat(
-                    resultIterator, 
TestDataMatchers.matchesSplitTestData(testRecords));
+            checkResultWithSemantic(resultIterator, 
Arrays.asList(testRecords), semantic, null);
         }
     }
 
@@ -151,13 +157,15 @@ public abstract class SourceTestSuiteBase<T> {
     @TestTemplate
     @DisplayName("Test source with multiple splits")
     public void testMultipleSplits(
-            TestEnvironment testEnv, DataStreamSourceExternalContext<T> 
externalContext)
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<T> externalContext,
+            CheckpointingMode semantic)
             throws Exception {
         // Step 1: Preparation
         TestingSourceSettings sourceSettings =
                 TestingSourceSettings.builder()
                         .setBoundedness(Boundedness.BOUNDED)
-                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+                        .setCheckpointingMode(semantic)
                         .build();
         TestEnvironmentSettings envOptions =
                 TestEnvironmentSettings.builder()
@@ -184,9 +192,7 @@ public abstract class SourceTestSuiteBase<T> {
         try (CloseableIterator<T> resultIterator = 
iteratorBuilder.build(jobClient)) {
             // Check test result
             LOG.info("Checking test results");
-            MatcherAssert.assertThat(
-                    resultIterator,
-                    
TestDataMatchers.matchesMultipleSplitTestData(testRecordsLists));
+            checkResultWithSemantic(resultIterator, testRecordsLists, 
semantic, null);
         }
     }
 
@@ -207,13 +213,15 @@ public abstract class SourceTestSuiteBase<T> {
     @TestTemplate
     @DisplayName("Test source with at least one idle parallelism")
     public void testIdleReader(
-            TestEnvironment testEnv, DataStreamSourceExternalContext<T> 
externalContext)
+            TestEnvironment testEnv,
+            DataStreamSourceExternalContext<T> externalContext,
+            CheckpointingMode semantic)
             throws Exception {
         // Step 1: Preparation
         TestingSourceSettings sourceSettings =
                 TestingSourceSettings.builder()
                         .setBoundedness(Boundedness.BOUNDED)
-                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+                        .setCheckpointingMode(semantic)
                         .build();
         TestEnvironmentSettings envOptions =
                 TestEnvironmentSettings.builder()
@@ -239,9 +247,7 @@ public abstract class SourceTestSuiteBase<T> {
         // Step 4: Validate test data
         try (CloseableIterator<T> resultIterator = 
iteratorBuilder.build(jobClient)) {
             LOG.info("Checking test results");
-            MatcherAssert.assertThat(
-                    resultIterator,
-                    
TestDataMatchers.matchesMultipleSplitTestData(testRecordsLists));
+            checkResultWithSemantic(resultIterator, testRecordsLists, 
semantic, null);
         }
     }
 
@@ -263,13 +269,14 @@ public abstract class SourceTestSuiteBase<T> {
     public void testTaskManagerFailure(
             TestEnvironment testEnv,
             DataStreamSourceExternalContext<T> externalContext,
-            ClusterControllable controller)
+            ClusterControllable controller,
+            CheckpointingMode semantic)
             throws Exception {
         // Step 1: Preparation
         TestingSourceSettings sourceSettings =
                 TestingSourceSettings.builder()
                         .setBoundedness(Boundedness.CONTINUOUS_UNBOUNDED)
-                        .setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
+                        .setCheckpointingMode(semantic)
                         .build();
         TestEnvironmentSettings envOptions =
                 TestEnvironmentSettings.builder()
@@ -302,17 +309,18 @@ public abstract class SourceTestSuiteBase<T> {
         // Step 4: Validate records before killing TaskManagers
         CloseableIterator<T> iterator = iteratorBuilder.build(jobClient);
         LOG.info("Checking records before killing TaskManagers");
-        MatcherAssert.assertThat(
+        checkResultWithSemantic(
                 iterator,
-                TestDataMatchers.matchesSplitTestData(
-                        testRecordsBeforeFailure, 
testRecordsBeforeFailure.size()));
+                Arrays.asList(testRecordsBeforeFailure),
+                semantic,
+                testRecordsBeforeFailure.size());
 
         // Step 5: Trigger TaskManager failover
         LOG.info("Trigger TaskManager failover");
         controller.triggerTaskManagerFailover(jobClient, () -> {});
 
         LOG.info("Waiting for job recovering from failure");
-        CommonTestUtils.waitForJobStatus(
+        waitForJobStatus(
                 jobClient,
                 Collections.singletonList(JobStatus.RUNNING),
                 Deadline.fromNow(Duration.ofSeconds(30)));
@@ -329,14 +337,15 @@ public abstract class SourceTestSuiteBase<T> {
 
         // Step 7: Validate test result
         LOG.info("Checking records after job failover");
-        MatcherAssert.assertThat(
+        checkResultWithSemantic(
                 iterator,
-                TestDataMatchers.matchesSplitTestData(
-                        testRecordsAfterFailure, 
testRecordsAfterFailure.size()));
+                Arrays.asList(testRecordsAfterFailure),
+                semantic,
+                testRecordsAfterFailure.size());
 
         // Step 8: Clean up
-        CommonTestUtils.terminateJob(jobClient, Duration.ofSeconds(30));
-        CommonTestUtils.waitForJobStatus(
+        terminateJob(jobClient, Duration.ofSeconds(30));
+        waitForJobStatus(
                 jobClient,
                 Collections.singletonList(JobStatus.CANCELED),
                 Deadline.fromNow(Duration.ofSeconds(30)));
@@ -400,6 +409,37 @@ public abstract class SourceTestSuiteBase<T> {
                 stream.getExecutionEnvironment().getCheckpointConfig());
     }
 
+    /**
+     * Compare the test data with the result.
+     *
+     * <p>If the source is bounded, limit should be null.
+     *
+     * @param resultIterator the data read from the job
+     * @param testData the test data
+     * @param semantic the supported semantic, see {@link CheckpointingMode}
+     * @param limit expected number of the data to read from the job
+     */
+    private void checkResultWithSemantic(
+            CloseableIterator<T> resultIterator,
+            List<List<T>> testData,
+            CheckpointingMode semantic,
+            Integer limit) {
+        if (limit != null) {
+            assertThat(
+                            CompletableFuture.supplyAsync(
+                                    () -> {
+                                        
CollectIteratorAssertions.assertThat(resultIterator)
+                                                .withNumRecordsLimit(limit)
+                                                
.matchesRecordsFromSource(testData, semantic);
+                                        return true;
+                                    }))
+                    .succeedsWithin(Duration.ofSeconds(30));
+        } else {
+            CollectIteratorAssertions.assertThat(resultIterator)
+                    .matchesRecordsFromSource(testData, semantic);
+        }
+    }
+
     /** Builder class for constructing {@link CollectResultIterator} of 
collect sink. */
     protected static class CollectIteratorBuilder<T> {
 
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java
new file mode 100644
index 0000000..5a291bd
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssert.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.assertj.core.api.AbstractAssert;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+/**
+ * This assertion used to compare records in the collect iterator to the 
target test data with
+ * different semantic(AT_LEAST_ONCE, EXACTLY_ONCE).
+ *
+ * @param <T> The type of records in the test data and collect iterator
+ */
+public class CollectIteratorAssert<T>
+        extends AbstractAssert<CollectIteratorAssert<T>, Iterator<T>> {
+
+    private final Iterator<T> collectorIterator;
+    private final List<RecordsFromSplit<T>> recordsFromSplits = new 
ArrayList<>();
+    private int totalNumRecords;
+    private Integer limit = null;
+
+    protected CollectIteratorAssert(Iterator<T> collectorIterator) {
+        super(collectorIterator, CollectIteratorAssert.class);
+        this.collectorIterator = collectorIterator;
+    }
+
+    public CollectIteratorAssert<T> withNumRecordsLimit(int limit) {
+        this.limit = limit;
+        return this;
+    }
+
+    public void matchesRecordsFromSource(
+            List<List<T>> recordsBySplitsFromSource, CheckpointingMode 
semantic) {
+        for (List<T> recordsFromSplit : recordsBySplitsFromSource) {
+            recordsFromSplits.add(new RecordsFromSplit<>(recordsFromSplit));
+            totalNumRecords += recordsFromSplit.size();
+        }
+
+        if (limit != null && limit > totalNumRecords) {
+            throw new IllegalArgumentException(
+                    "Limit validation size should be less than total number of 
records from source");
+        }
+
+        switch (semantic) {
+            case AT_LEAST_ONCE:
+                compareWithAtLeastOnceSemantic(collectorIterator, 
recordsFromSplits);
+                break;
+            case EXACTLY_ONCE:
+                compareWithExactlyOnceSemantic(collectorIterator, 
recordsFromSplits);
+                break;
+            default:
+                throw new IllegalArgumentException(
+                        String.format("Unrecognized semantic \"%s\"", 
semantic));
+        }
+    }
+
+    private void compareWithAtLeastOnceSemantic(
+            Iterator<T> resultIterator, List<RecordsFromSplit<T>> 
recordsFromSplits) {
+        List<T> duplicateRead = new LinkedList<>();
+
+        int recordCounter = 0;
+        while (resultIterator.hasNext()) {
+            final T record = resultIterator.next();
+            if (!matchThenNext(record)) {
+                duplicateRead.add(record);
+            } else {
+                recordCounter++;
+            }
+
+            if (limit != null && recordCounter >= limit) {
+                break;
+            }
+        }
+        if (limit == null && !hasReachedEnd()) {
+            failWithMessage(
+                    generateMismatchDescription(
+                            String.format(
+                                    "Expected to have at least %d records in 
result, but only received %d records",
+                                    recordsFromSplits.stream()
+                                            .mapToInt(
+                                                    recordsFromSplit ->
+                                                            
recordsFromSplit.records.size())
+                                            .sum(),
+                                    recordCounter),
+                            resultIterator));
+        } else {
+            confirmDuplicateRead(duplicateRead);
+        }
+    }
+
+    private void compareWithExactlyOnceSemantic(
+            Iterator<T> resultIterator, List<RecordsFromSplit<T>> 
recordsFromSplits) {
+        int recordCounter = 0;
+        while (resultIterator.hasNext()) {
+            final T record = resultIterator.next();
+            if (!matchThenNext(record)) {
+                if (recordCounter >= totalNumRecords) {
+                    failWithMessage(
+                            generateMismatchDescription(
+                                    String.format(
+                                            "Expected to have exactly %d 
records in result, but received more records",
+                                            recordsFromSplits.stream()
+                                                    .mapToInt(
+                                                            recordsFromSplit ->
+                                                                    
recordsFromSplit.records.size())
+                                                    .sum()),
+                                    resultIterator));
+                } else {
+                    failWithMessage(
+                            generateMismatchDescription(
+                                    String.format(
+                                            "Unexpected record '%s' at 
position %d",
+                                            record, recordCounter),
+                                    resultIterator));
+                }
+            }
+            recordCounter++;
+            if (limit != null && recordCounter >= limit) {
+                break;
+            }
+        }
+        if (limit == null && !hasReachedEnd()) {
+            failWithMessage(
+                    generateMismatchDescription(
+                            String.format(
+                                    "Expected to have exactly %d records in 
result, but only received %d records",
+                                    recordsFromSplits.stream()
+                                            .mapToInt(
+                                                    recordsFromSplit ->
+                                                            
recordsFromSplit.records.size())
+                                            .sum(),
+                                    recordCounter),
+                            resultIterator));
+        }
+    }
+
+    private void confirmDuplicateRead(List<T> duplicateRead) {
+        for (T record : duplicateRead) {
+            boolean found = false;
+            for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+                if (recordsFromSplit.records.contains(record)) {
+                    found = true;
+                    break;
+                }
+            }
+            if (!found) {
+                failWithMessage(String.format("Unexpected duplicate record 
'%s'", record));
+            }
+        }
+    }
+
+    /**
+     * Check if any pointing data is identical to the record from the stream, 
and move the pointer
+     * to next record if matched.
+     *
+     * @param record Record from stream
+     */
+    private boolean matchThenNext(T record) {
+        for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+            if (!recordsFromSplit.hasNext()) {
+                continue;
+            }
+            if (record.equals(recordsFromSplit.current())) {
+                recordsFromSplit.forward();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Whether all pointers have reached the end of lists.
+     *
+     * @return True if all pointers have reached the end.
+     */
+    private boolean hasReachedEnd() {
+        for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+            if (recordsFromSplit.hasNext()) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private String generateMismatchDescription(String reason, Iterator<T> 
resultIterator) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(reason).append("\n");
+        sb.append("Current progress of multiple split test data 
validation:\n");
+        int splitCounter = 0;
+        for (RecordsFromSplit<T> recordsFromSplit : recordsFromSplits) {
+            sb.append(
+                    String.format(
+                            "Split %d (%d/%d): \n",
+                            splitCounter++,
+                            recordsFromSplit.offset,
+                            recordsFromSplit.records.size()));
+            for (int recordIndex = 0;
+                    recordIndex < recordsFromSplit.records.size();
+                    recordIndex++) {
+                sb.append(recordsFromSplit.records.get(recordIndex));
+                if (recordIndex == recordsFromSplit.offset) {
+                    sb.append("\t<----");
+                }
+                sb.append("\n");
+            }
+        }
+        if (resultIterator.hasNext()) {
+            sb.append("Remaining received elements after the unexpected one: 
\n");
+            while (resultIterator.hasNext()) {
+                sb.append(resultIterator.next()).append("\n");
+            }
+        }
+        return sb.toString();
+    }
+
+    private static class RecordsFromSplit<T> {
+        private int offset = 0;
+        private final List<T> records;
+
+        public RecordsFromSplit(List<T> records) {
+            this.records = records;
+        }
+
+        public T current() {
+            if (!hasNext()) {
+                return null;
+            }
+            return records.get(offset);
+        }
+
+        public void forward() {
+            ++offset;
+        }
+
+        public boolean hasNext() {
+            return offset < records.size();
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
new file mode 100644
index 0000000..8bbb529
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertions.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import java.util.Iterator;
+
+/**
+ * Entry point for assertion methods for {@link CollectIteratorAssert}. Each 
method in this class is
+ * a static factory.
+ */
+public class CollectIteratorAssertions {
+    public static <T> CollectIteratorAssert<T> assertThat(Iterator<T> actual) {
+        return new CollectIteratorAssert<>(actual);
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
deleted file mode 100644
index 2420fde..0000000
--- 
a/flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
+++ /dev/null
@@ -1,300 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.testframe.utils;
-
-import org.apache.flink.annotation.Internal;
-
-import org.hamcrest.Description;
-import org.hamcrest.TypeSafeDiagnosingMatcher;
-
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-
-/** Matchers for validating test data. */
-@Internal
-public class TestDataMatchers {
-
-    // ----------------------------  Matcher Builders 
----------------------------------
-    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
-            List<List<T>> testRecordsLists) {
-        return new MultipleSplitDataMatcher<>(testRecordsLists);
-    }
-
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> 
testData) {
-        return new SingleSplitDataMatcher<>(testData);
-    }
-
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> 
testData, int limit) {
-        return new SingleSplitDataMatcher<>(testData, limit);
-    }
-
-    // ---------------------------- Matcher Definitions 
--------------------------------
-
-    /**
-     * Matcher for validating test data in a single split.
-     *
-     * @param <T> Type of validating record
-     */
-    public static class SingleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
-        private static final int UNSET = -1;
-
-        private final List<T> testData;
-        private final int limit;
-
-        private String mismatchDescription = null;
-
-        public SingleSplitDataMatcher(List<T> testData) {
-            this.testData = testData;
-            this.limit = UNSET;
-        }
-
-        public SingleSplitDataMatcher(List<T> testData, int limit) {
-            if (limit > testData.size()) {
-                throw new IllegalArgumentException(
-                        "Limit validation size should be less than number of 
test records");
-            }
-            this.testData = testData;
-            this.limit = limit;
-        }
-
-        @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
-            }
-
-            boolean dataMismatch = false;
-            boolean sizeMismatch = false;
-            String sizeMismatchDescription = "";
-            String dataMismatchDescription = "";
-            int recordCounter = 0;
-            for (T testRecord : testData) {
-                if (!resultIterator.hasNext()) {
-                    sizeMismatchDescription =
-                            String.format(
-                                    "Expected to have %d records in result, 
but only received %d records",
-                                    limit == UNSET ? testData.size() : limit, 
recordCounter);
-                    sizeMismatch = true;
-                    break;
-                }
-                T resultRecord = resultIterator.next();
-                if (!testRecord.equals(resultRecord)) {
-                    dataMismatchDescription =
-                            String.format(
-                                    "Mismatched record at position %d: 
Expected '%s' but was '%s'",
-                                    recordCounter, testRecord, resultRecord);
-                    dataMismatch = true;
-                }
-                recordCounter++;
-                if (limit != UNSET && recordCounter >= limit) {
-                    break;
-                }
-            }
-
-            if (limit == UNSET && resultIterator.hasNext()) {
-                sizeMismatchDescription =
-                        String.format(
-                                "Expected to have exactly %d records in 
result, "
-                                        + "but result iterator hasn't reached 
the end",
-                                testData.size());
-                sizeMismatch = true;
-            }
-
-            if (dataMismatch && sizeMismatch) {
-                mismatchDescription = sizeMismatchDescription + " And " + 
dataMismatchDescription;
-                return false;
-            } else if (dataMismatch) {
-                mismatchDescription = dataMismatchDescription;
-                return false;
-            } else if (sizeMismatch) {
-                mismatchDescription = sizeMismatchDescription;
-                return false;
-            } else {
-                return true;
-            }
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText(
-                    "Records consumed by Flink should be identical to test 
data "
-                            + "and preserve the order in split");
-        }
-    }
-
-    /**
-     * Matcher for validating test data from multiple splits.
-     *
-     * <p>Each list has a pointer (iterator) pointing to current checking 
record. When a record is
-     * received in the stream, it will be compared to all current pointing 
records in lists, and the
-     * pointer to the identical record will move forward.
-     *
-     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
-     * should reach the end of the list finally.
-     *
-     * @param <T> Type of validating record
-     */
-    public static class MultipleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
-
-        List<TestRecords<T>> testRecordsLists = new ArrayList<>();
-
-        private String mismatchDescription = null;
-
-        public MultipleSplitDataMatcher(List<List<T>> testData) {
-            for (List<T> testRecordsList : testData) {
-                this.testRecordsLists.add(new TestRecords<>(testRecordsList));
-            }
-        }
-
-        @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
-            }
-
-            int recordCounter = 0;
-            while (resultIterator.hasNext()) {
-                final T record = resultIterator.next();
-                if (!matchThenNext(record)) {
-                    this.mismatchDescription =
-                            generateMismatchDescription(
-                                    String.format(
-                                            "Unexpected record '%s' at 
position %d",
-                                            record, recordCounter),
-                                    resultIterator);
-                    return false;
-                }
-                recordCounter++;
-            }
-
-            if (!hasReachedEnd()) {
-                this.mismatchDescription =
-                        generateMismatchDescription(
-                                String.format(
-                                        "Expected to have exactly %d records 
in result, but only received %d records",
-                                        testRecordsLists.stream()
-                                                .mapToInt(list -> 
list.records.size())
-                                                .sum(),
-                                        recordCounter),
-                                resultIterator);
-                return false;
-            } else {
-                return true;
-            }
-        }
-
-        @Override
-        public void describeTo(Description description) {
-            description.appendText(
-                    "Records consumed by Flink should be identical to test 
data "
-                            + "and preserve the order in multiple splits");
-        }
-
-        /**
-         * Check if any pointing data is identical to the record from the 
stream, and move the
-         * pointer to next record if matched. Otherwise throws an exception.
-         *
-         * @param record Record from stream
-         */
-        private boolean matchThenNext(T record) {
-            for (TestRecords<T> testRecordsList : testRecordsLists) {
-                if (!testRecordsList.hasNext()) {
-                    continue;
-                }
-                if (record.equals(testRecordsList.current())) {
-                    testRecordsList.forward();
-                    return true;
-                }
-            }
-            return false;
-        }
-
-        /**
-         * Whether all pointers have reached the end of lists.
-         *
-         * @return True if all pointers have reached the end.
-         */
-        private boolean hasReachedEnd() {
-            for (TestRecords<T> testRecordsList : testRecordsLists) {
-                if (testRecordsList.hasNext()) {
-                    return false;
-                }
-            }
-            return true;
-        }
-
-        String generateMismatchDescription(String reason, Iterator<T> 
resultIterator) {
-            final StringBuilder sb = new StringBuilder();
-            sb.append(reason).append("\n");
-            sb.append("Current progress of multiple split test data 
validation:\n");
-            int splitCounter = 0;
-            for (TestRecords<T> testRecordsList : testRecordsLists) {
-                sb.append(
-                        String.format(
-                                "Split %d (%d/%d): \n",
-                                splitCounter++,
-                                testRecordsList.offset,
-                                testRecordsList.records.size()));
-                for (int recordIndex = 0;
-                        recordIndex < testRecordsList.records.size();
-                        recordIndex++) {
-                    sb.append(testRecordsList.records.get(recordIndex));
-                    if (recordIndex == testRecordsList.offset) {
-                        sb.append("\t<----");
-                    }
-                    sb.append("\n");
-                }
-            }
-            if (resultIterator.hasNext()) {
-                sb.append("Remaining received elements after the unexpected 
one: \n");
-                while (resultIterator.hasNext()) {
-                    sb.append(resultIterator.next()).append("\n");
-                }
-            }
-            return sb.toString();
-        }
-
-        private static class TestRecords<T> {
-            private int offset = 0;
-            private final List<T> records;
-
-            public TestRecords(List<T> records) {
-                this.records = records;
-            }
-
-            public T current() {
-                if (!hasNext()) {
-                    return null;
-                }
-                return records.get(offset);
-            }
-
-            public void forward() {
-                ++offset;
-            }
-
-            public boolean hasNext() {
-                return offset < records.size();
-            }
-        }
-    }
-}
diff --git 
a/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java
 
b/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java
new file mode 100644
index 0000000..7503e8e4
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/CollectIteratorAssertTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.connector.testframe.utils.CollectIteratorAssertions.assertThat;
+
+/** Unit tests for {@link CollectIteratorAssertTest}. */
+public class CollectIteratorAssertTest {
+    @Nested
+    class MultipleSplitDataMatcherTest {
+        private final List<String> splitA = Arrays.asList("alpha", "beta", 
"gamma");
+        private final List<String> splitB = Arrays.asList("one", "two", 
"three");
+        private final List<String> splitC = Arrays.asList("1", "2", "3");
+        private final List<List<String>> testDataCollection = 
Arrays.asList(splitA, splitB, splitC);
+
+        @Test
+        public void testDataMatcherWithExactlyOnceSemantic() {
+            final List<String> result = unionLists(splitA, splitB, splitC);
+            assertThat(result.iterator())
+                    .matchesRecordsFromSource(testDataCollection, 
CheckpointingMode.EXACTLY_ONCE);
+        }
+
+        @Test
+        public void testDataMatcherWithAtLeastOnceSemantic() {
+            final List<String> result = unionLists(splitA, splitB, splitC, 
splitA);
+            assertThat(result.iterator())
+                    .matchesRecordsFromSource(testDataCollection, 
CheckpointingMode.AT_LEAST_ONCE);
+        }
+
+        @Test
+        public void testResultLessThanExpected() {
+            final ArrayList<String> splitATestDataWithoutLast = new 
ArrayList<>(splitA);
+            splitATestDataWithoutLast.remove(splitA.size() - 1);
+            final List<String> result = unionLists(splitATestDataWithoutLast, 
splitB, splitC);
+            Assertions.assertThatThrownBy(
+                            () ->
+                                    assertThat(result.iterator())
+                                            .matchesRecordsFromSource(
+                                                    testDataCollection,
+                                                    
CheckpointingMode.EXACTLY_ONCE))
+                    .hasMessageContaining(
+                            "Expected to have exactly 9 records in result, but 
only received 8 records\n"
+                                    + "Current progress of multiple split test 
data validation:\n"
+                                    + "Split 0 (2/3): \n"
+                                    + "alpha\n"
+                                    + "beta\n"
+                                    + "gamma\t<----\n"
+                                    + "Split 1 (3/3): \n"
+                                    + "one\n"
+                                    + "two\n"
+                                    + "three\n"
+                                    + "Split 2 (3/3): \n"
+                                    + "1\n"
+                                    + "2\n"
+                                    + "3\n");
+        }
+
+        @Test
+        public void testResultMoreThanExpected() {
+            final List<String> result = unionLists(splitA, splitB, splitC);
+            result.add("delta");
+            Assertions.assertThatThrownBy(
+                            () ->
+                                    assertThat(result.iterator())
+                                            .matchesRecordsFromSource(
+                                                    testDataCollection,
+                                                    
CheckpointingMode.EXACTLY_ONCE))
+                    .hasMessageContaining(
+                            "Expected to have exactly 9 records in result, but 
received more records\n"
+                                    + "Current progress of multiple split test 
data validation:\n"
+                                    + "Split 0 (3/3): \n"
+                                    + "alpha\n"
+                                    + "beta\n"
+                                    + "gamma\n"
+                                    + "Split 1 (3/3): \n"
+                                    + "one\n"
+                                    + "two\n"
+                                    + "three\n"
+                                    + "Split 2 (3/3): \n"
+                                    + "1\n"
+                                    + "2\n"
+                                    + "3\n");
+        }
+
+        @Test
+        public void testOutOfOrder() {
+            List<String> reverted = new ArrayList<>(splitC);
+            Collections.reverse(reverted);
+            final List<String> result = unionLists(splitA, splitB, reverted);
+            Assertions.assertThatThrownBy(
+                            () ->
+                                    assertThat(result.iterator())
+                                            .matchesRecordsFromSource(
+                                                    testDataCollection,
+                                                    
CheckpointingMode.EXACTLY_ONCE))
+                    .hasMessageContaining(
+                            "Unexpected record '3' at position 6\n"
+                                    + "Current progress of multiple split test 
data validation:\n"
+                                    + "Split 0 (3/3): \n"
+                                    + "alpha\n"
+                                    + "beta\n"
+                                    + "gamma\n"
+                                    + "Split 1 (3/3): \n"
+                                    + "one\n"
+                                    + "two\n"
+                                    + "three\n"
+                                    + "Split 2 (0/3): \n"
+                                    + "1\t<----\n"
+                                    + "2\n"
+                                    + "3\n"
+                                    + "Remaining received elements after the 
unexpected one: \n"
+                                    + "2\n"
+                                    + "1\n");
+        }
+    }
+
+    @SafeVarargs
+    private final <T> List<T> unionLists(List<T>... lists) {
+        return 
Stream.of(lists).flatMap(Collection::stream).collect(Collectors.toList());
+    }
+}

Reply via email to