This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b6bf1aad5f2ebeb318b98162271eded355685ac8
Author: Qingsheng Ren <[email protected]>
AuthorDate: Thu Aug 12 17:59:30 2021 +0800

    [FLINK-19554][connector/testing-framework] Base test class for source with 
fundamental test cases
---
 .../common/testsuites/SourceTestSuiteBase.java     | 299 +++++++++++++++++++++
 .../test/common/utils/TestDataMatchers.java        | 246 +++++++++++++++++
 .../test/common/utils/IteratorWithCurrentTest.java |  77 ++++++
 .../test/common/utils/TestDataMatchersTest.java    | 166 ++++++++++++
 4 files changed, 788 insertions(+)

diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
new file mode 100644
index 0000000..4b90272
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.testsuites;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.JobStatus;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.connectors.test.common.environment.ClusterControllable;
+import org.apache.flink.connectors.test.common.environment.TestEnvironment;
+import org.apache.flink.connectors.test.common.external.ExternalContext;
+import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter;
+import 
org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider;
+import 
org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension;
+import org.apache.flink.core.execution.JobClient;
+import org.apache.flink.runtime.testutils.CommonTestUtils;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
+import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator;
+import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.streaming.api.operators.collect.CollectStreamSink;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static 
org.apache.flink.connectors.test.common.utils.TestDataMatchers.matchesMultipleSplitTestData;
+import static 
org.apache.flink.connectors.test.common.utils.TestDataMatchers.matchesSplitTestData;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+/**
+ * Base class for all test suites.
+ *
+ * <p>All cases should have well-descriptive JavaDoc, including:
+ *
+ * <ul>
+ *   <li>What's the purpose of this case
+ *   <li>Simple description of how this case works
+ *   <li>Condition to fulfill in order to pass this case
+ *   <li>Requirement of running this case
+ * </ul>
+ */
+@ExtendWith({
+    ConnectorTestingExtension.class,
+    TestLoggerExtension.class,
+    TestCaseInvocationContextProvider.class
+})
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Experimental
+public abstract class SourceTestSuiteBase<T> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(SourceTestSuiteBase.class);
+
+    // ----------------------------- Basic test cases 
---------------------------------
+
+    /**
+     * Test connector source with only one split in the external system.
+     *
+     * <p>This test will create one split in the external system, write test 
data into it, and
+     * consume back via a Flink job with 1 parallelism.
+     *
+     * <p>The number and order of records consumed by Flink need to be 
identical to the test data
+     * written to the external system in order to pass this test.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @TestTemplate
+    @DisplayName("Test source with single split")
+    public void testSourceSingleSplit(TestEnvironment testEnv, 
ExternalContext<T> externalContext)
+            throws Exception {
+
+        // Write test data to external system
+        final Collection<T> testRecords = 
generateAndWriteTestData(externalContext);
+
+        // Build and execute Flink job
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1)
+                        .executeAndCollect("Source Single Split Test")) {
+            // Check test result
+            assertThat(resultIterator, matchesSplitTestData(testRecords));
+        }
+    }
+
+    /**
+     * Test connector source with multiple splits in the external system
+     *
+     * <p>This test will create 4 splits in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 4 parallelism.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @TestTemplate
+    @DisplayName("Test source with multiple splits")
+    public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        LOG.debug("Build and execute Flink job");
+        StreamExecutionEnvironment execEnv = 
testEnv.createExecutionEnvironment();
+
+        try (final CloseableIterator<T> resultIterator =
+                execEnv.fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber)
+                        .executeAndCollect("Source Multiple Split Test")) {
+            // Check test result
+            assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordCollections));
+        }
+    }
+
+    /**
+     * Test connector source with an idle reader.
+     *
+     * <p>This test will create 4 split in the external system, write test 
data to all splits, and
+     * consume back via a Flink job with 5 parallelism, so at least one 
parallelism / source reader
+     * will be idle (assigned with no splits). If the split enumerator of the 
source doesn't signal
+     * NoMoreSplitsEvent to the idle source reader, the Flink job will never 
spin to FINISHED state.
+     *
+     * <p>The number and order of records in each split consumed by Flink need 
to be identical to
+     * the test data written into the external system to pass this test. 
There's no requirement for
+     * record order across splits.
+     *
+     * <p>A bounded source is required for this test.
+     */
+    @TestTemplate
+    @DisplayName("Test source with at least one idle parallelism")
+    public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> 
externalContext)
+            throws Exception {
+
+        final int splitNumber = 4;
+        final List<Collection<T>> testRecordCollections = new ArrayList<>();
+        for (int i = 0; i < splitNumber; i++) {
+            
testRecordCollections.add(generateAndWriteTestData(externalContext));
+        }
+
+        try (CloseableIterator<T> resultIterator =
+                testEnv.createExecutionEnvironment()
+                        .fromSource(
+                                
externalContext.createSource(Boundedness.BOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(splitNumber + 1)
+                        .executeAndCollect("Redundant Parallelism Test")) {
+            assertThat(resultIterator, 
matchesMultipleSplitTestData(testRecordCollections));
+        }
+    }
+
+    /**
+     * Test connector source with task manager failover.
+     *
+     * <p>This test will create 1 split in the external system, write test 
record set A into the
+     * split, restart task manager to trigger job failover, write test record 
set B into the split,
+     * and terminate the Flink job finally.
+     *
+     * <p>The number and order of records consumed by Flink should be 
identical to A before the
+     * failover and B after the failover in order to pass the test.
+     *
+     * <p>An unbounded source is required for this test, since TaskManager 
failover will be
+     * triggered in the middle of the test.
+     */
+    @TestTemplate
+    @DisplayName("Test TaskManager failure")
+    public void testTaskManagerFailure(
+            TestEnvironment testEnv,
+            ExternalContext<T> externalContext,
+            ClusterControllable controller)
+            throws Exception {
+
+        final Collection<T> testRecordsBeforeFailure =
+                
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+        final SourceSplitDataWriter<T> sourceSplitDataWriter =
+                externalContext.createSourceSplitDataWriter();
+        sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure);
+
+        final StreamExecutionEnvironment env = 
testEnv.createExecutionEnvironment();
+
+        env.enableCheckpointing(50);
+        final DataStreamSource<T> dataStreamSource =
+                env.fromSource(
+                                
externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED),
+                                WatermarkStrategy.noWatermarks(),
+                                "Tested Source")
+                        .setParallelism(1);
+
+        // Since DataStream API doesn't expose job client for 
executeAndCollect(), we have
+        // to reuse these part of code to get both job client and result 
iterator :-(
+        // ------------------------------------ START 
---------------------------------------------
+        TypeSerializer<T> serializer = 
dataStreamSource.getType().createSerializer(env.getConfig());
+        String accumulatorName = "dataStreamCollect_" + UUID.randomUUID();
+        CollectSinkOperatorFactory<T> factory =
+                new CollectSinkOperatorFactory<>(serializer, accumulatorName);
+        CollectSinkOperator<T> operator = (CollectSinkOperator<T>) 
factory.getOperator();
+        CollectResultIterator<T> iterator =
+                new CollectResultIterator<>(
+                        operator.getOperatorIdFuture(),
+                        serializer,
+                        accumulatorName,
+                        env.getCheckpointConfig());
+        CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, 
factory);
+        sink.name("Data stream collect sink");
+        env.addOperator(sink.getTransformation());
+        final JobClient jobClient = env.executeAsync("TaskManager Failover 
Test");
+        iterator.setJobClient(jobClient);
+        // -------------------------------------- END 
---------------------------------------------
+
+        assertThat(
+                iterator,
+                matchesSplitTestData(testRecordsBeforeFailure, 
testRecordsBeforeFailure.size()));
+
+        // -------------------------------- Trigger failover 
---------------------------------------
+        controller.triggerTaskManagerFailover(jobClient, () -> {});
+
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.RUNNING),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+
+        final Collection<T> testRecordsAfterFailure =
+                
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+        sourceSplitDataWriter.writeRecords(testRecordsAfterFailure);
+
+        assertThat(
+                iterator,
+                matchesSplitTestData(testRecordsAfterFailure, 
testRecordsAfterFailure.size()));
+
+        // Clean up
+        iterator.close();
+        CommonTestUtils.terminateJob(jobClient, Duration.ofSeconds(30));
+        CommonTestUtils.waitForJobStatus(
+                jobClient,
+                Collections.singletonList(JobStatus.CANCELED),
+                Deadline.fromNow(Duration.ofSeconds(30)));
+    }
+
+    // ----------------------------- Helper Functions 
---------------------------------
+
+    /**
+     * Generate a set of test records and write it to the given split writer.
+     *
+     * @param externalContext External context
+     * @return Collection of generated test records
+     */
+    protected Collection<T> generateAndWriteTestData(ExternalContext<T> 
externalContext) {
+        final Collection<T> testRecordCollection =
+                
externalContext.generateTestData(ThreadLocalRandom.current().nextLong());
+        LOG.debug("Writing {} records to external system", 
testRecordCollection.size());
+        
externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection);
+        return testRecordCollection;
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
new file mode 100644
index 0000000..87d4905
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.utils;
+
+import org.apache.flink.annotation.Internal;
+
+import org.hamcrest.Description;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/** Matchers for validating test data. */
+@Internal
+public class TestDataMatchers {
+
+    // ----------------------------  Matcher Builders 
----------------------------------
+    public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData(
+            Collection<Collection<T>> testDataCollections) {
+        return new MultipleSplitDataMatcher<>(testDataCollections);
+    }
+
+    public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> 
testData) {
+        return new SplitDataMatcher<>(testData);
+    }
+
+    public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> 
testData, int limit) {
+        return new SplitDataMatcher<>(testData, limit);
+    }
+
+    // ---------------------------- Matcher Definitions 
--------------------------------
+
+    /**
+     * Matcher for validating test data in a single split.
+     *
+     * @param <T> Type of validating record
+     */
+    public static class SplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
+        private static final int UNSET = -1;
+
+        private final Collection<T> testData;
+        private final int limit;
+
+        private String mismatchDescription = null;
+
+        public SplitDataMatcher(Collection<T> testData) {
+            this.testData = testData;
+            this.limit = UNSET;
+        }
+
+        public SplitDataMatcher(Collection<T> testData, int limit) {
+            if (limit > testData.size()) {
+                throw new IllegalArgumentException(
+                        "Limit validation size should be less than number of 
test records");
+            }
+            this.testData = testData;
+            this.limit = limit;
+        }
+
+        @Override
+        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
+            if (mismatchDescription != null) {
+                description.appendText(mismatchDescription);
+                return false;
+            }
+
+            int recordCounter = 0;
+            for (T testRecord : testData) {
+                if (!resultIterator.hasNext()) {
+                    mismatchDescription = "Result data is less than test data";
+                    return false;
+                }
+                T resultRecord = resultIterator.next();
+                if (!testRecord.equals(resultRecord)) {
+                    mismatchDescription =
+                            String.format(
+                                    "Mismatched record at position %d: 
Expected '%s' but was '%s'",
+                                    recordCounter, testRecord, resultRecord);
+                    return false;
+                }
+                recordCounter++;
+                if (limit != UNSET && recordCounter >= limit) {
+                    break;
+                }
+            }
+            if (limit == UNSET && resultIterator.hasNext()) {
+                mismatchDescription = "Result data is more than test data";
+                return false;
+            } else {
+                return true;
+            }
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description.appendText(
+                    "Records consumed by Flink should be identical to test 
data "
+                            + "and preserve the order in split");
+        }
+    }
+
+    /**
+     * Matcher for validating test data from multiple splits.
+     *
+     * <p>Each collection has a pointer (iterator) pointing to current 
checking record. When a
+     * record is received in the stream, it will be compared to all current 
pointing records in
+     * collections, and the pointer to the identical record will move forward.
+     *
+     * <p>If the stream preserves the correctness and order of records in all 
splits, all pointers
+     * should reach the end of the collection finally.
+     *
+     * @param <T> Type of validating record
+     */
+    public static class MultipleSplitDataMatcher<T> extends 
TypeSafeDiagnosingMatcher<Iterator<T>> {
+
+        private final List<IteratorWithCurrent<T>> testDataIterators = new 
ArrayList<>();
+
+        private String mismatchDescription = null;
+
+        public MultipleSplitDataMatcher(Collection<Collection<T>> 
testDataCollections) {
+            for (Collection<T> testDataCollection : testDataCollections) {
+                testDataIterators.add(new 
IteratorWithCurrent<>(testDataCollection.iterator()));
+            }
+        }
+
+        @Override
+        protected boolean matchesSafely(Iterator<T> resultIterator, 
Description description) {
+            if (mismatchDescription != null) {
+                description.appendText(mismatchDescription);
+                return false;
+            }
+            while (resultIterator.hasNext()) {
+                final T record = resultIterator.next();
+                if (!matchThenNext(record)) {
+                    mismatchDescription = String.format("Unexpected record 
'%s'", record);
+                    return false;
+                }
+            }
+            if (!hasReachedEnd()) {
+                mismatchDescription = "Result data is less than test data";
+                return false;
+            } else {
+                return true;
+            }
+        }
+
+        @Override
+        public void describeTo(Description description) {
+            description.appendText(
+                    "Records consumed by Flink should be identical to test 
data "
+                            + "and preserve the order in multiple splits");
+        }
+
+        /**
+         * Check if any pointing data is identical to the record from the 
stream, and move the
+         * pointer to next record if matched. Otherwise throws an exception.
+         *
+         * @param record Record from stream
+         */
+        private boolean matchThenNext(T record) {
+            for (IteratorWithCurrent<T> testDataIterator : testDataIterators) {
+                if (record.equals(testDataIterator.current())) {
+                    testDataIterator.next();
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        /**
+         * Whether all pointers have reached the end of collections.
+         *
+         * @return True if all pointers have reached the end.
+         */
+        private boolean hasReachedEnd() {
+            for (IteratorWithCurrent<T> testDataIterator : testDataIterators) {
+                if (testDataIterator.hasNext()) {
+                    return false;
+                }
+            }
+            return true;
+        }
+    }
+
+    /**
+     * An iterator wrapper which can access the element that the iterator is 
currently pointing to.
+     *
+     * @param <E> The type of elements returned by this iterator.
+     */
+    static class IteratorWithCurrent<E> implements Iterator<E> {
+
+        private final Iterator<E> originalIterator;
+        private E current;
+
+        public IteratorWithCurrent(Iterator<E> originalIterator) {
+            this.originalIterator = originalIterator;
+            try {
+                current = originalIterator.next();
+            } catch (NoSuchElementException e) {
+                current = null;
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return current != null;
+        }
+
+        @Override
+        public E next() {
+            if (current == null) {
+                throw new NoSuchElementException();
+            }
+            E previous = current;
+            if (originalIterator.hasNext()) {
+                current = originalIterator.next();
+            } else {
+                current = null;
+            }
+            return previous;
+        }
+
+        public E current() {
+            return current;
+        }
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/IteratorWithCurrentTest.java
 
b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/IteratorWithCurrentTest.java
new file mode 100644
index 0000000..97a7b48
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/IteratorWithCurrentTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.utils;
+
+import 
org.apache.flink.connectors.test.common.utils.TestDataMatchers.IteratorWithCurrent;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/** Unit test for {@link IteratorWithCurrent}. */
+public class IteratorWithCurrentTest {
+
+    @Test
+    public void testIterator() {
+        List<Integer> numberList = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            numberList.add(i);
+        }
+        IteratorWithCurrent<Integer> iterator = new 
IteratorWithCurrent<>(numberList.iterator());
+        Integer num = 0;
+        while (iterator.hasNext()) {
+            assertEquals(num, iterator.next());
+            num++;
+        }
+        assertEquals(10, num.intValue());
+        assertNull(iterator.current());
+    }
+
+    @Test
+    public void testEmptyList() {
+        IteratorWithCurrent<Integer> iterator =
+                new IteratorWithCurrent<>(Collections.emptyIterator());
+        assertNull(iterator.current());
+        assertThrows(NoSuchElementException.class, iterator::next);
+    }
+
+    @Test
+    public void testCurrentElement() {
+        List<Integer> numberList = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            numberList.add(i);
+        }
+        IteratorWithCurrent<Integer> iterator = new 
IteratorWithCurrent<>(numberList.iterator());
+        Integer num = 0;
+        while (iterator.hasNext()) {
+            assertEquals(num, iterator.current());
+            assertEquals(num, iterator.next());
+            num++;
+        }
+        assertEquals(10, num.intValue());
+        assertNull(iterator.current());
+    }
+}
diff --git 
a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
 
b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
new file mode 100644
index 0000000..1833185
--- /dev/null
+++ 
b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connectors.test.common.utils;
+
+import org.hamcrest.Description;
+import org.hamcrest.StringDescription;
+import org.hamcrest.TypeSafeDiagnosingMatcher;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+
+/** Unit test for {@link TestDataMatchers}. */
+public class TestDataMatchersTest {
+    @Nested
+    class SplitDataMatcherTest {
+        private final List<String> testData = Arrays.asList("alpha", "beta", 
"gamma");
+
+        @Test
+        public void testPositiveCases() {
+            assertThat(testData.iterator(), 
TestDataMatchers.matchesSplitTestData(testData));
+            assertThat(
+                    testData.iterator(),
+                    TestDataMatchers.matchesSplitTestData(testData, 
testData.size()));
+            assertThat(
+                    testData.iterator(),
+                    TestDataMatchers.matchesSplitTestData(testData, 
testData.size() - 1));
+        }
+
+        @Test
+        public void testMismatch() throws Exception {
+            final List<String> resultData = new ArrayList<>(testData);
+            resultData.set(1, "delta");
+            final Iterator<String> resultIterator = resultData.iterator();
+
+            final TestDataMatchers.SplitDataMatcher<String> matcher =
+                    TestDataMatchers.matchesSplitTestData(testData);
+
+            assertMatcherFailedWithDescription(
+                    resultIterator,
+                    matcher,
+                    "Mismatched record at position 1: Expected 'beta' but was 
'delta'");
+        }
+
+        @Test
+        public void testResultMoreThanExpected() throws Exception {
+            final List<String> resultData = new ArrayList<>(testData);
+            resultData.add("delta");
+            final Iterator<String> resultIterator = resultData.iterator();
+
+            final TestDataMatchers.SplitDataMatcher<String> matcher =
+                    TestDataMatchers.matchesSplitTestData(testData);
+
+            assertMatcherFailedWithDescription(
+                    resultIterator, matcher, "Result data is more than test 
data");
+        }
+
+        @Test
+        public void testResultLessThanExpected() throws Exception {
+            final List<String> resultData = new ArrayList<>(testData);
+            resultData.remove(testData.size() - 1);
+            final Iterator<String> resultIterator = resultData.iterator();
+
+            final TestDataMatchers.SplitDataMatcher<String> matcher =
+                    TestDataMatchers.matchesSplitTestData(testData);
+
+            assertMatcherFailedWithDescription(
+                    resultIterator, matcher, "Result data is less than test 
data");
+        }
+    }
+
+    @Nested
+    class MultipleSplitDataMatcherTest {
+        private final List<String> splitA = Arrays.asList("alpha", "beta", 
"gamma");
+        private final List<String> splitB = Arrays.asList("one", "two", 
"three");
+        private final List<String> splitC = Arrays.asList("1", "2", "3");
+        private final List<Collection<String>> testDataCollection =
+                Arrays.asList(splitA, splitB, splitC);
+
+        @Test
+        public void testPositiveCase() {
+            final List<String> result = unionLists(splitA, splitB, splitC);
+            assertThat(
+                    result.iterator(),
+                    
TestDataMatchers.matchesMultipleSplitTestData(testDataCollection));
+        }
+
+        @Test
+        public void testResultLessThanExpected() throws Exception {
+            final ArrayList<String> splitATestDataWithoutLast = new 
ArrayList<>(splitA);
+            splitATestDataWithoutLast.remove(splitA.size() - 1);
+            final List<String> result = unionLists(splitATestDataWithoutLast, 
splitB, splitC);
+            final TestDataMatchers.MultipleSplitDataMatcher<String> matcher =
+                    
TestDataMatchers.matchesMultipleSplitTestData(testDataCollection);
+            assertMatcherFailedWithDescription(
+                    result.iterator(), matcher, "Result data is less than test 
data");
+        }
+
+        @Test
+        public void testResultMoreThanExpected() throws Exception {
+            final List<String> result = unionLists(splitA, splitB, splitC);
+            result.add("delta");
+            final TestDataMatchers.MultipleSplitDataMatcher<String> matcher =
+                    
TestDataMatchers.matchesMultipleSplitTestData(testDataCollection);
+            assertMatcherFailedWithDescription(
+                    result.iterator(), matcher, "Unexpected record 'delta'");
+        }
+
+        @Test
+        public void testOutOfOrder() throws Exception {
+            List<String> reverted = new ArrayList<>(splitC);
+            Collections.reverse(reverted);
+            final List<String> result = unionLists(splitA, splitB, reverted);
+            final TestDataMatchers.MultipleSplitDataMatcher<String> matcher =
+                    
TestDataMatchers.matchesMultipleSplitTestData(testDataCollection);
+            assertMatcherFailedWithDescription(result.iterator(), matcher, 
"Unexpected record '3'");
+        }
+    }
+
+    @SafeVarargs
+    private final <T> List<T> unionLists(List<T>... lists) {
+        return 
Stream.of(lists).flatMap(Collection::stream).collect(Collectors.toList());
+    }
+
+    private <T> void assertMatcherFailedWithDescription(
+            T object, TypeSafeDiagnosingMatcher<T> matcher, String 
expectedDescription)
+            throws Exception {
+        final Method method =
+                TypeSafeDiagnosingMatcher.class.getDeclaredMethod(
+                        "matchesSafely", Object.class, Description.class);
+        method.setAccessible(true);
+        assertFalse((boolean) method.invoke(matcher, object, new 
Description.NullDescription()));
+
+        final StringDescription actualDescription = new StringDescription();
+        method.invoke(matcher, object, actualDescription);
+        Assertions.assertEquals(expectedDescription, 
actualDescription.toString());
+    }
+}

Reply via email to