leonardBang commented on a change in pull request #18516:
URL: https://github.com/apache/flink/pull/18516#discussion_r804510698



##########
File path: 
flink-formats/flink-avro/src/test/java/org/apache/flink/formats/avro/AvroBulkFormatITCase.java
##########
@@ -64,6 +66,10 @@
 
     private static final RowDataSerializer SERIALIZER = new 
RowDataSerializer(ROW_TYPE);
 
+    @SuppressWarnings("unused")

Review comment:
       hint: add `@SuppressWarnings("unused")` for all unused annotation fields 

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
##########
@@ -71,7 +82,18 @@ public MiniClusterTestEnvironment() {
     @Override
     public StreamExecutionEnvironment createExecutionEnvironment(
             TestEnvironmentSettings envOptions) {
-        return StreamExecutionEnvironment.getExecutionEnvironment();
+        Configuration configuration = new Configuration();
+        if (envOptions.getSavepointRestorePath() != null) {
+            configuration.set(SAVEPOINT_PATH, 
envOptions.getSavepointRestorePath());

Review comment:
       ```suggestion
               configuration.setString(SAVEPOINT_PATH, 
envOptions.getSavepointRestorePath());
   ```

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/environment/MiniClusterTestEnvironment.java
##########
@@ -53,9 +61,12 @@
     private boolean isStarted = false;
 
     public MiniClusterTestEnvironment() {
+        Configuration conf = new Configuration();
+        conf.set(METRIC_FETCHER_UPDATE_INTERVAL, 1000L);

Review comment:
       minor: Use constant for specific test configuration value. 

##########
File path: 
flink-end-to-end-tests/flink-end-to-end-tests-common/src/main/java/org/apache/flink/tests/util/flink/FlinkContainerTestEnvironment.java
##########
@@ -68,6 +71,7 @@ public FlinkContainerTestEnvironment(
         config.set(HEARTBEAT_INTERVAL, 1000L);
         config.set(HEARTBEAT_TIMEOUT, 5000L);
         config.set(SLOT_REQUEST_TIMEOUT, 10000L);
+        config.set(METRIC_FETCHER_UPDATE_INTERVAL, 1000L);

Review comment:
       We can use constant to maintain these magic number, these are used for 
tests with default constant value IIUC.
   btw, Could connector developer pass custom Configuration? 
   

##########
File path: 
flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/testutils/KafkaSourceExternalContext.java
##########
@@ -58,20 +58,20 @@
 public class KafkaSourceExternalContext implements 
DataStreamSourceExternalContext<String> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaSourceExternalContext.class);
-    private static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
-    private static final Pattern TOPIC_NAME_PATTERN = 
Pattern.compile(TOPIC_NAME_PREFIX + ".*");
-    private static final String GROUP_ID_PREFIX = 
"kafka-source-external-context-";
+    protected static final String TOPIC_NAME_PREFIX = "kafka-test-topic-";
+    protected static final Pattern TOPIC_NAME_PATTERN = 
Pattern.compile(TOPIC_NAME_PREFIX + ".*");
+    protected static final String GROUP_ID_PREFIX = 
"kafka-source-external-context-";
     private static final int NUM_RECORDS_UPPER_BOUND = 500;
     private static final int NUM_RECORDS_LOWER_BOUND = 100;
 
     private final List<URL> connectorJarPaths;
-    private final String bootstrapServers;
-    private final String topicName;
-    private final SplitMappingMode splitMappingMode;
-    private final AdminClient adminClient;
+    protected final String bootstrapServers;
+    protected final String topicName;
+    protected final SplitMappingMode splitMappingMode;
+    protected final AdminClient adminClient;

Review comment:
       Do we need to change this privilege? I didn't find any child visit this.

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/junit/annotations/Semantic.java
##########
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.junit.annotations;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.lang.annotation.ElementType;
+import java.lang.annotation.Retention;
+import java.lang.annotation.RetentionPolicy;
+import java.lang.annotation.Target;
+
+/**
+ * Marks the field in test class defining supported semantic: {@link
+ * org.apache.flink.streaming.api.CheckpointingMode}.
+ *
+ * <p>Only one field can be annotated in test class.
+ */
+@Target(ElementType.FIELD)
+@Retention(RetentionPolicy.RUNTIME)
+@Experimental
+public @interface Semantic {}

Review comment:
       ```suggestion
   public @interface TestSemantics {}
   ```
   
   We can use this in following way IIUC, right ? 
   ```
   @TestSemantics
       CheckpointingMode[] testSemantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE, CheckpointingMode.AT_LEAST_ONCE};
   ```
   ```
   

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQueryer.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** The queryer is used to get job metrics by rest API. */

Review comment:
       ```suggestion
   /** The querier  used to get job metrics by rest API. */
   ```

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestUtils.java
##########
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.connector.testframe.external.ExternalSystemDataReader;
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import java.io.File;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.attribute.FileAttribute;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.streaming.api.CheckpointingMode.AT_LEAST_ONCE;
+import static org.apache.flink.streaming.api.CheckpointingMode.EXACTLY_ONCE;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Test utils. */
+public class TestUtils {
+    public static File newFolder(Path path) throws IOException {
+        Path tempPath = Files.createTempDirectory(path, "testing-framework", 
new FileAttribute[0]);
+        return tempPath.toFile();
+    }
+
+    public static <T> List<T> appendResultData(
+            List<T> result,
+            ExternalSystemDataReader<T> reader,
+            List<T> expected,
+            int retryTimes,
+            CheckpointingMode semantic) {
+        long timeoutMs = 1000L;
+        int retryIndex = 0;
+        if (EXACTLY_ONCE.equals(semantic)) {
+            while (retryIndex++ < retryTimes && result.size() < 
expected.size()) {
+                result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+            }
+            return result;
+        } else if (AT_LEAST_ONCE.equals(semantic)) {
+            while (retryIndex++ < retryTimes && !containSameVal(expected, 
result, semantic)) {
+                result.addAll(reader.poll(Duration.ofMillis(timeoutMs)));
+            }
+            return result;
+        }
+        throw new IllegalStateException(
+                String.format("%s delivery guarantee doesn't support test.", 
semantic.name()));
+    }

Review comment:
       These methods never used ?
   
   

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/MetricQueryer.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.testframe.environment.TestEnvironment;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.RestClient;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsHeaders;
+import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.util.ConfigurationException;
+import org.apache.flink.util.StringUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Iterator;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** The queryer is used to get job metrics by rest API. */
+public class MetricQueryer {
+    private static final Logger LOG = 
LoggerFactory.getLogger(MetricQueryer.class);
+    private RestClient restClient;
+
+    public MetricQueryer(Configuration configuration, Executor executor)
+            throws ConfigurationException {
+        restClient = new RestClient(configuration, executor);
+    }
+
+    public JobDetailsInfo getJobDetails(TestEnvironment.Endpoint endpoint, 
JobID jobId)
+            throws Exception {
+        String jmAddress = endpoint.getAddress();
+        int jmPort = endpoint.getPort();
+
+        final JobMessageParameters params = new JobMessageParameters();
+        params.jobPathParameter.resolve(jobId);
+
+        return restClient
+                .sendRequest(
+                        jmAddress,
+                        jmPort,
+                        JobDetailsHeaders.getInstance(),
+                        params,
+                        EmptyRequestBody.getInstance())
+                .get(30, TimeUnit.SECONDS);
+    }
+
+    public AggregatedMetricsResponseBody getMetricList(
+            TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID 
vertexId) throws Exception {
+        AggregatedSubtaskMetricsParameters subtaskMetricsParameters =
+                new AggregatedSubtaskMetricsParameters();
+        Iterator<MessagePathParameter<?>> pathParams =
+                subtaskMetricsParameters.getPathParameters().iterator();
+        ((JobIDPathParameter) pathParams.next()).resolve(jobId);
+        ((JobVertexIdPathParameter) pathParams.next()).resolve(vertexId);
+        return restClient
+                .sendRequest(
+                        endpoint.getAddress(),
+                        endpoint.getPort(),
+                        AggregatedSubtaskMetricsHeaders.getInstance(),
+                        subtaskMetricsParameters,
+                        EmptyRequestBody.getInstance())
+                .get(30, TimeUnit.SECONDS);
+    }
+
+    public AggregatedMetricsResponseBody getMetrics(
+            TestEnvironment.Endpoint endpoint, JobID jobId, JobVertexID 
vertexId, String filters)
+            throws Exception {
+        AggregatedSubtaskMetricsParameters subtaskMetricsParameters =
+                new AggregatedSubtaskMetricsParameters();
+        Iterator<MessagePathParameter<?>> pathParams =
+                subtaskMetricsParameters.getPathParameters().iterator();
+        ((JobIDPathParameter) pathParams.next()).resolve(jobId);
+        ((JobVertexIdPathParameter) pathParams.next()).resolve(vertexId);
+        MetricsFilterParameter metricFilter =
+                (MetricsFilterParameter)
+                        
subtaskMetricsParameters.getQueryParameters().iterator().next();
+        metricFilter.resolveFromString(filters);
+
+        return restClient
+                .sendRequest(
+                        endpoint.getAddress(),
+                        endpoint.getPort(),
+                        AggregatedSubtaskMetricsHeaders.getInstance(),
+                        subtaskMetricsParameters,
+                        EmptyRequestBody.getInstance())
+                .get(30, TimeUnit.SECONDS);
+    }
+
+    public Double getMetricByRestApi(

Review comment:
       Maybe `getAggregatedMetricsByRestAPI` is more accurate as the function 
returns `Double` type value.

##########
File path: 
flink-connectors/flink-connector-pulsar/src/test/java/org/apache/flink/connector/pulsar/source/PulsarSourceITCase.java
##########
@@ -40,6 +42,9 @@
     @TestExternalSystem
     PulsarTestEnvironment pulsar = new 
PulsarTestEnvironment(PulsarRuntime.mock());
 
+    @Semantic
+    CheckpointingMode[] semantics = new CheckpointingMode[] 
{CheckpointingMode.EXACTLY_ONCE};

Review comment:
       I didn't find any ITCase for `CheckpointingMode.AT_LEAST_ONCE`,  we need 
add some case as we introduced the semantics  supports.

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
##########
@@ -33,180 +36,194 @@
 
     // ----------------------------  Matcher Builders 
----------------------------------
     public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
-            List<List<T>> testRecordsLists) {
-        return new MultipleSplitDataMatcher<>(testRecordsLists);
+            List<List<T>> testRecordsLists, CheckpointingMode semantic) {
+        return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
     }
 
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> 
testData) {
-        return new SingleSplitDataMatcher<>(testData);
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            List<List<T>> testRecordsLists,
+            CheckpointingMode semantic,
+            boolean testDataAllInResult) {
+        return new MultipleSplitDataMatcher<>(
+                testRecordsLists, MultipleSplitDataMatcher.UNSET, semantic, 
testDataAllInResult);
     }
 
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> 
testData, int limit) {
-        return new SingleSplitDataMatcher<>(testData, limit);
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            List<List<T>> testRecordsLists, Integer limit, CheckpointingMode 
semantic) {
+        if (limit == null) {
+            return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
+        }
+        return new MultipleSplitDataMatcher<>(testRecordsLists, limit, 
semantic);
     }
 
     // ---------------------------- Matcher Definitions 
--------------------------------
-
     /**
-     * Matcher for validating test data in a single split.
+     * Matcher for validating test data from multiple splits.
+     *
+     * <p>Each list has a pointer (iterator) pointing to current checking 
record. When a record is
+     * received in the stream, it will be compared to all current pointing 
records in lists, and the
+     * pointer to the identical record will move forward.
+     *
+     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
+     * should reach the end of the list finally.
      *
      * @param <T> Type of validating record
      */
-    public static class SingleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
+    public static class MultipleSplitDataMatcher<T> extends 
Condition<Iterator<T>> {
+        private static final Logger LOG = 
LoggerFactory.getLogger(MultipleSplitDataMatcher.class);
+
         private static final int UNSET = -1;
 
-        private final List<T> testData;
-        private final int limit;
+        List<TestRecords<T>> testRecordsLists = new ArrayList<>();
 
+        private List<List<T>> testData;
         private String mismatchDescription = null;
+        private final int limit;
+        private final int testDataSize;
+        private final CheckpointingMode semantic;
+        private final boolean testDataAllInResult;
 
-        public SingleSplitDataMatcher(List<T> testData) {
-            this.testData = testData;
-            this.limit = UNSET;
+        public MultipleSplitDataMatcher(List<List<T>> testData, 
CheckpointingMode semantic) {
+            this(testData, UNSET, semantic);
         }
 
-        public SingleSplitDataMatcher(List<T> testData, int limit) {
-            if (limit > testData.size()) {
+        public MultipleSplitDataMatcher(
+                List<List<T>> testData, int limit, CheckpointingMode semantic) 
{
+            this(testData, limit, semantic, true);
+        }
+
+        public MultipleSplitDataMatcher(
+                List<List<T>> testData,
+                int limit,
+                CheckpointingMode semantic,
+                boolean testDataAllInResult) {
+            super();
+            int allSize = 0;
+            for (List<T> testRecordsList : testData) {
+                this.testRecordsLists.add(new TestRecords<>(testRecordsList));
+                allSize += testRecordsList.size();
+            }
+
+            if (limit > allSize) {
                 throw new IllegalArgumentException(
                         "Limit validation size should be less than number of 
test records");
             }
+            this.testDataAllInResult = testDataAllInResult;
             this.testData = testData;
+            this.semantic = semantic;
+            this.testDataSize = allSize;
             this.limit = limit;
         }
 
         @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
+        public boolean matches(Iterator<T> resultIterator) {
+            if (CheckpointingMode.AT_LEAST_ONCE.equals(semantic)) {
+                return matchAtLeastOnce(resultIterator);
             }
+            return matchExactlyOnce(resultIterator);
+        }
 
-            boolean dataMismatch = false;
-            boolean sizeMismatch = false;
-            String sizeMismatchDescription = "";
-            String dataMismatchDescription = "";
+        protected boolean matchExactlyOnce(Iterator<T> resultIterator) {
             int recordCounter = 0;
-            for (T testRecord : testData) {
-                if (!resultIterator.hasNext()) {
-                    sizeMismatchDescription =
-                            String.format(
-                                    "Expected to have %d records in result, 
but only received %d records",
-                                    limit == UNSET ? testData.size() : limit, 
recordCounter);
-                    sizeMismatch = true;
-                    break;
-                }
-                T resultRecord = resultIterator.next();
-                if (!testRecord.equals(resultRecord)) {
-                    dataMismatchDescription =
-                            String.format(
-                                    "Mismatched record at position %d: 
Expected '%s' but was '%s'",
-                                    recordCounter, testRecord, resultRecord);
-                    dataMismatch = true;
+            while (resultIterator.hasNext()) {
+                final T record = resultIterator.next();
+                if (!matchThenNext(record)) {
+                    if (recordCounter >= testDataSize) {
+                        this.mismatchDescription =
+                                generateMismatchDescription(
+                                        String.format(
+                                                "Expected to have exactly %d 
records in result, but received more records",
+                                                testRecordsLists.stream()
+                                                        .mapToInt(list -> 
list.records.size())
+                                                        .sum()),
+                                        resultIterator);
+                    } else {
+                        this.mismatchDescription =
+                                generateMismatchDescription(
+                                        String.format(
+                                                "Unexpected record '%s' at 
position %d",
+                                                record, recordCounter),
+                                        resultIterator);
+                    }
+                    logError();
+                    return false;
                 }
                 recordCounter++;
+
                 if (limit != UNSET && recordCounter >= limit) {
                     break;
                 }
             }
-
-            if (limit == UNSET && resultIterator.hasNext()) {
-                sizeMismatchDescription =
-                        String.format(
-                                "Expected to have exactly %d records in 
result, "
-                                        + "but result iterator hasn't reached 
the end",
-                                testData.size());
-                sizeMismatch = true;
-            }
-
-            if (dataMismatch && sizeMismatch) {
-                mismatchDescription = sizeMismatchDescription + " And " + 
dataMismatchDescription;
-                return false;
-            } else if (dataMismatch) {
-                mismatchDescription = dataMismatchDescription;
-                return false;
-            } else if (sizeMismatch) {
-                mismatchDescription = sizeMismatchDescription;
+            if (limit == UNSET && !hasReachedEnd()) {
+                this.mismatchDescription =
+                        generateMismatchDescription(
+                                String.format(
+                                        "Expected to have exactly %d records 
in result, but only received %d records",
+                                        testRecordsLists.stream()
+                                                .mapToInt(list -> 
list.records.size())
+                                                .sum(),
+                                        recordCounter),
+                                resultIterator);
+                logError();
                 return false;
             } else {
                 return true;
             }
         }
 
-        @Override
-        public void describeTo(Description description) {
-            description.appendText(
-                    "Records consumed by Flink should be identical to test 
data "
-                            + "and preserve the order in split");
-        }
-    }
-
-    /**
-     * Matcher for validating test data from multiple splits.
-     *
-     * <p>Each list has a pointer (iterator) pointing to current checking 
record. When a record is
-     * received in the stream, it will be compared to all current pointing 
records in lists, and the
-     * pointer to the identical record will move forward.
-     *
-     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
-     * should reach the end of the list finally.
-     *
-     * @param <T> Type of validating record
-     */
-    public static class MultipleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
-
-        List<TestRecords<T>> testRecordsLists = new ArrayList<>();
-
-        private String mismatchDescription = null;
-
-        public MultipleSplitDataMatcher(List<List<T>> testData) {
-            for (List<T> testRecordsList : testData) {
-                this.testRecordsLists.add(new TestRecords<>(testRecordsList));
-            }
-        }
-
-        @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
-            }
+        protected boolean matchAtLeastOnce(Iterator<T> resultIterator) {

Review comment:
       Add unit test for `AtLeastOnce` case ?

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/testsuites/SourceTestSuiteBase.java
##########
@@ -86,6 +103,8 @@
 public abstract class SourceTestSuiteBase<T> {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(SourceTestSuiteBase.class);
+    static ExecutorService executorService =
+            
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2);

Review comment:
       `Runtime.getRuntime().availableProcessors() * 2` is too expensive from 
my side, and could you add a `private` decoration too.

##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/test/java/org/apache/flink/connector/testframe/utils/TestDataMatchersTest.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.testframe.utils;
+
+import org.apache.flink.streaming.api.CheckpointingMode;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Unit test for {@link TestDataMatchers}. */
+public class TestDataMatchersTest {

Review comment:
       Add tests for `CheckpointingMode.AT_LEAST_ONCE`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to