ruanhang1993 commented on a change in pull request #18516:
URL: https://github.com/apache/flink/pull/18516#discussion_r804792672



##########
File path: 
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
##########
@@ -33,180 +36,194 @@
 
     // ----------------------------  Matcher Builders 
----------------------------------
     public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
-            List<List<T>> testRecordsLists) {
-        return new MultipleSplitDataMatcher<>(testRecordsLists);
+            List<List<T>> testRecordsLists, CheckpointingMode semantic) {
+        return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
     }
 
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> 
testData) {
-        return new SingleSplitDataMatcher<>(testData);
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            List<List<T>> testRecordsLists,
+            CheckpointingMode semantic,
+            boolean testDataAllInResult) {
+        return new MultipleSplitDataMatcher<>(
+                testRecordsLists, MultipleSplitDataMatcher.UNSET, semantic, 
testDataAllInResult);
     }
 
-    public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T> 
testData, int limit) {
-        return new SingleSplitDataMatcher<>(testData, limit);
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            List<List<T>> testRecordsLists, Integer limit, CheckpointingMode 
semantic) {
+        if (limit == null) {
+            return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
+        }
+        return new MultipleSplitDataMatcher<>(testRecordsLists, limit, 
semantic);
     }
 
     // ---------------------------- Matcher Definitions 
--------------------------------
-
     /**
-     * Matcher for validating test data in a single split.
+     * Matcher for validating test data from multiple splits.
+     *
+     * <p>Each list has a pointer (iterator) pointing to current checking 
record. When a record is
+     * received in the stream, it will be compared to all current pointing 
records in lists, and the
+     * pointer to the identical record will move forward.
+     *
+     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
+     * should reach the end of the list finally.
      *
      * @param <T> Type of validating record
      */
-    public static class SingleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
+    public static class MultipleSplitDataMatcher<T> extends 
Condition<Iterator<T>> {
+        private static final Logger LOG = 
LoggerFactory.getLogger(MultipleSplitDataMatcher.class);
+
         private static final int UNSET = -1;
 
-        private final List<T> testData;
-        private final int limit;
+        List<TestRecords<T>> testRecordsLists = new ArrayList<>();
 
+        private List<List<T>> testData;
         private String mismatchDescription = null;
+        private final int limit;
+        private final int testDataSize;
+        private final CheckpointingMode semantic;
+        private final boolean testDataAllInResult;
 
-        public SingleSplitDataMatcher(List<T> testData) {
-            this.testData = testData;
-            this.limit = UNSET;
+        public MultipleSplitDataMatcher(List<List<T>> testData, 
CheckpointingMode semantic) {
+            this(testData, UNSET, semantic);
         }
 
-        public SingleSplitDataMatcher(List<T> testData, int limit) {
-            if (limit > testData.size()) {
+        public MultipleSplitDataMatcher(
+                List<List<T>> testData, int limit, CheckpointingMode semantic) 
{
+            this(testData, limit, semantic, true);
+        }
+
+        public MultipleSplitDataMatcher(
+                List<List<T>> testData,
+                int limit,
+                CheckpointingMode semantic,
+                boolean testDataAllInResult) {
+            super();
+            int allSize = 0;
+            for (List<T> testRecordsList : testData) {
+                this.testRecordsLists.add(new TestRecords<>(testRecordsList));
+                allSize += testRecordsList.size();
+            }
+
+            if (limit > allSize) {
                 throw new IllegalArgumentException(
                         "Limit validation size should be less than number of 
test records");
             }
+            this.testDataAllInResult = testDataAllInResult;
             this.testData = testData;
+            this.semantic = semantic;
+            this.testDataSize = allSize;
             this.limit = limit;
         }
 
         @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
+        public boolean matches(Iterator<T> resultIterator) {
+            if (CheckpointingMode.AT_LEAST_ONCE.equals(semantic)) {
+                return matchAtLeastOnce(resultIterator);
             }
+            return matchExactlyOnce(resultIterator);
+        }
 
-            boolean dataMismatch = false;
-            boolean sizeMismatch = false;
-            String sizeMismatchDescription = "";
-            String dataMismatchDescription = "";
+        protected boolean matchExactlyOnce(Iterator<T> resultIterator) {
             int recordCounter = 0;
-            for (T testRecord : testData) {
-                if (!resultIterator.hasNext()) {
-                    sizeMismatchDescription =
-                            String.format(
-                                    "Expected to have %d records in result, 
but only received %d records",
-                                    limit == UNSET ? testData.size() : limit, 
recordCounter);
-                    sizeMismatch = true;
-                    break;
-                }
-                T resultRecord = resultIterator.next();
-                if (!testRecord.equals(resultRecord)) {
-                    dataMismatchDescription =
-                            String.format(
-                                    "Mismatched record at position %d: 
Expected '%s' but was '%s'",
-                                    recordCounter, testRecord, resultRecord);
-                    dataMismatch = true;
+            while (resultIterator.hasNext()) {
+                final T record = resultIterator.next();
+                if (!matchThenNext(record)) {
+                    if (recordCounter >= testDataSize) {
+                        this.mismatchDescription =
+                                generateMismatchDescription(
+                                        String.format(
+                                                "Expected to have exactly %d 
records in result, but received more records",
+                                                testRecordsLists.stream()
+                                                        .mapToInt(list -> 
list.records.size())
+                                                        .sum()),
+                                        resultIterator);
+                    } else {
+                        this.mismatchDescription =
+                                generateMismatchDescription(
+                                        String.format(
+                                                "Unexpected record '%s' at 
position %d",
+                                                record, recordCounter),
+                                        resultIterator);
+                    }
+                    logError();
+                    return false;
                 }
                 recordCounter++;
+
                 if (limit != UNSET && recordCounter >= limit) {
                     break;
                 }
             }
-
-            if (limit == UNSET && resultIterator.hasNext()) {
-                sizeMismatchDescription =
-                        String.format(
-                                "Expected to have exactly %d records in 
result, "
-                                        + "but result iterator hasn't reached 
the end",
-                                testData.size());
-                sizeMismatch = true;
-            }
-
-            if (dataMismatch && sizeMismatch) {
-                mismatchDescription = sizeMismatchDescription + " And " + 
dataMismatchDescription;
-                return false;
-            } else if (dataMismatch) {
-                mismatchDescription = dataMismatchDescription;
-                return false;
-            } else if (sizeMismatch) {
-                mismatchDescription = sizeMismatchDescription;
+            if (limit == UNSET && !hasReachedEnd()) {
+                this.mismatchDescription =
+                        generateMismatchDescription(
+                                String.format(
+                                        "Expected to have exactly %d records 
in result, but only received %d records",
+                                        testRecordsLists.stream()
+                                                .mapToInt(list -> 
list.records.size())
+                                                .sum(),
+                                        recordCounter),
+                                resultIterator);
+                logError();
                 return false;
             } else {
                 return true;
             }
         }
 
-        @Override
-        public void describeTo(Description description) {
-            description.appendText(
-                    "Records consumed by Flink should be identical to test 
data "
-                            + "and preserve the order in split");
-        }
-    }
-
-    /**
-     * Matcher for validating test data from multiple splits.
-     *
-     * <p>Each list has a pointer (iterator) pointing to current checking 
record. When a record is
-     * received in the stream, it will be compared to all current pointing 
records in lists, and the
-     * pointer to the identical record will move forward.
-     *
-     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
-     * should reach the end of the list finally.
-     *
-     * @param <T> Type of validating record
-     */
-    public static class MultipleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
-
-        List<TestRecords<T>> testRecordsLists = new ArrayList<>();
-
-        private String mismatchDescription = null;
-
-        public MultipleSplitDataMatcher(List<List<T>> testData) {
-            for (List<T> testRecordsList : testData) {
-                this.testRecordsLists.add(new TestRecords<>(testRecordsList));
-            }
-        }
-
-        @Override
-        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
-            if (mismatchDescription != null) {
-                description.appendText(mismatchDescription);
-                return false;
-            }
+        protected boolean matchAtLeastOnce(Iterator<T> resultIterator) {

Review comment:
       This has been fixed in PR #18547 .




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to