ruanhang1993 commented on a change in pull request #18516:
URL: https://github.com/apache/flink/pull/18516#discussion_r804792672
##########
File path:
flink-test-utils-parent/flink-connector-test-utils/src/main/java/org/apache/flink/connector/testframe/utils/TestDataMatchers.java
##########
@@ -33,180 +36,194 @@
// ---------------------------- Matcher Builders
----------------------------------
public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
- List<List<T>> testRecordsLists) {
- return new MultipleSplitDataMatcher<>(testRecordsLists);
+ List<List<T>> testRecordsLists, CheckpointingMode semantic) {
+ return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
}
- public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T>
testData) {
- return new SingleSplitDataMatcher<>(testData);
+ public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+ List<List<T>> testRecordsLists,
+ CheckpointingMode semantic,
+ boolean testDataAllInResult) {
+ return new MultipleSplitDataMatcher<>(
+ testRecordsLists, MultipleSplitDataMatcher.UNSET, semantic,
testDataAllInResult);
}
- public static <T> SingleSplitDataMatcher<T> matchesSplitTestData(List<T>
testData, int limit) {
- return new SingleSplitDataMatcher<>(testData, limit);
+ public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+ List<List<T>> testRecordsLists, Integer limit, CheckpointingMode
semantic) {
+ if (limit == null) {
+ return new MultipleSplitDataMatcher<>(testRecordsLists, semantic);
+ }
+ return new MultipleSplitDataMatcher<>(testRecordsLists, limit,
semantic);
}
// ---------------------------- Matcher Definitions
--------------------------------
-
/**
- * Matcher for validating test data in a single split.
+ * Matcher for validating test data from multiple splits.
+ *
+ * <p>Each list has a pointer (iterator) pointing to current checking
record. When a record is
+ * received in the stream, it will be compared to all current pointing
records in lists, and the
+ * pointer to the identical record will move forward.
+ *
+ * <p>If the stream preserves the correctness and order of records in all
splits, all pointers
+ * should reach the end of the list finally.
*
* @param <T> Type of validating record
*/
- public static class SingleSplitDataMatcher<T> extends
TypeSafeDiagnosingMatcher<Iterator<T>> {
+ public static class MultipleSplitDataMatcher<T> extends
Condition<Iterator<T>> {
+ private static final Logger LOG =
LoggerFactory.getLogger(MultipleSplitDataMatcher.class);
+
private static final int UNSET = -1;
- private final List<T> testData;
- private final int limit;
+ List<TestRecords<T>> testRecordsLists = new ArrayList<>();
+ private List<List<T>> testData;
private String mismatchDescription = null;
+ private final int limit;
+ private final int testDataSize;
+ private final CheckpointingMode semantic;
+ private final boolean testDataAllInResult;
- public SingleSplitDataMatcher(List<T> testData) {
- this.testData = testData;
- this.limit = UNSET;
+ public MultipleSplitDataMatcher(List<List<T>> testData,
CheckpointingMode semantic) {
+ this(testData, UNSET, semantic);
}
- public SingleSplitDataMatcher(List<T> testData, int limit) {
- if (limit > testData.size()) {
+ public MultipleSplitDataMatcher(
+ List<List<T>> testData, int limit, CheckpointingMode semantic)
{
+ this(testData, limit, semantic, true);
+ }
+
+ public MultipleSplitDataMatcher(
+ List<List<T>> testData,
+ int limit,
+ CheckpointingMode semantic,
+ boolean testDataAllInResult) {
+ super();
+ int allSize = 0;
+ for (List<T> testRecordsList : testData) {
+ this.testRecordsLists.add(new TestRecords<>(testRecordsList));
+ allSize += testRecordsList.size();
+ }
+
+ if (limit > allSize) {
throw new IllegalArgumentException(
"Limit validation size should be less than number of
test records");
}
+ this.testDataAllInResult = testDataAllInResult;
this.testData = testData;
+ this.semantic = semantic;
+ this.testDataSize = allSize;
this.limit = limit;
}
@Override
- protected boolean matchesSafely(Iterator<T> resultIterator,
Description description) {
- if (mismatchDescription != null) {
- description.appendText(mismatchDescription);
- return false;
+ public boolean matches(Iterator<T> resultIterator) {
+ if (CheckpointingMode.AT_LEAST_ONCE.equals(semantic)) {
+ return matchAtLeastOnce(resultIterator);
}
+ return matchExactlyOnce(resultIterator);
+ }
- boolean dataMismatch = false;
- boolean sizeMismatch = false;
- String sizeMismatchDescription = "";
- String dataMismatchDescription = "";
+ protected boolean matchExactlyOnce(Iterator<T> resultIterator) {
int recordCounter = 0;
- for (T testRecord : testData) {
- if (!resultIterator.hasNext()) {
- sizeMismatchDescription =
- String.format(
- "Expected to have %d records in result,
but only received %d records",
- limit == UNSET ? testData.size() : limit,
recordCounter);
- sizeMismatch = true;
- break;
- }
- T resultRecord = resultIterator.next();
- if (!testRecord.equals(resultRecord)) {
- dataMismatchDescription =
- String.format(
- "Mismatched record at position %d:
Expected '%s' but was '%s'",
- recordCounter, testRecord, resultRecord);
- dataMismatch = true;
+ while (resultIterator.hasNext()) {
+ final T record = resultIterator.next();
+ if (!matchThenNext(record)) {
+ if (recordCounter >= testDataSize) {
+ this.mismatchDescription =
+ generateMismatchDescription(
+ String.format(
+ "Expected to have exactly %d
records in result, but received more records",
+ testRecordsLists.stream()
+ .mapToInt(list ->
list.records.size())
+ .sum()),
+ resultIterator);
+ } else {
+ this.mismatchDescription =
+ generateMismatchDescription(
+ String.format(
+ "Unexpected record '%s' at
position %d",
+ record, recordCounter),
+ resultIterator);
+ }
+ logError();
+ return false;
}
recordCounter++;
+
if (limit != UNSET && recordCounter >= limit) {
break;
}
}
-
- if (limit == UNSET && resultIterator.hasNext()) {
- sizeMismatchDescription =
- String.format(
- "Expected to have exactly %d records in
result, "
- + "but result iterator hasn't reached
the end",
- testData.size());
- sizeMismatch = true;
- }
-
- if (dataMismatch && sizeMismatch) {
- mismatchDescription = sizeMismatchDescription + " And " +
dataMismatchDescription;
- return false;
- } else if (dataMismatch) {
- mismatchDescription = dataMismatchDescription;
- return false;
- } else if (sizeMismatch) {
- mismatchDescription = sizeMismatchDescription;
+ if (limit == UNSET && !hasReachedEnd()) {
+ this.mismatchDescription =
+ generateMismatchDescription(
+ String.format(
+ "Expected to have exactly %d records
in result, but only received %d records",
+ testRecordsLists.stream()
+ .mapToInt(list ->
list.records.size())
+ .sum(),
+ recordCounter),
+ resultIterator);
+ logError();
return false;
} else {
return true;
}
}
- @Override
- public void describeTo(Description description) {
- description.appendText(
- "Records consumed by Flink should be identical to test
data "
- + "and preserve the order in split");
- }
- }
-
- /**
- * Matcher for validating test data from multiple splits.
- *
- * <p>Each list has a pointer (iterator) pointing to current checking
record. When a record is
- * received in the stream, it will be compared to all current pointing
records in lists, and the
- * pointer to the identical record will move forward.
- *
- * <p>If the stream preserves the correctness and order of records in all
splits, all pointers
- * should reach the end of the list finally.
- *
- * @param <T> Type of validating record
- */
- public static class MultipleSplitDataMatcher<T> extends
TypeSafeDiagnosingMatcher<Iterator<T>> {
-
- List<TestRecords<T>> testRecordsLists = new ArrayList<>();
-
- private String mismatchDescription = null;
-
- public MultipleSplitDataMatcher(List<List<T>> testData) {
- for (List<T> testRecordsList : testData) {
- this.testRecordsLists.add(new TestRecords<>(testRecordsList));
- }
- }
-
- @Override
- protected boolean matchesSafely(Iterator<T> resultIterator,
Description description) {
- if (mismatchDescription != null) {
- description.appendText(mismatchDescription);
- return false;
- }
+ protected boolean matchAtLeastOnce(Iterator<T> resultIterator) {
Review comment:
This has been fixed in PR #18547 .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]