This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit b6bf1aad5f2ebeb318b98162271eded355685ac8 Author: Qingsheng Ren <[email protected]> AuthorDate: Thu Aug 12 17:59:30 2021 +0800 [FLINK-19554][connector/testing-framework] Base test class for source with fundamental test cases --- .../common/testsuites/SourceTestSuiteBase.java | 299 +++++++++++++++++++++ .../test/common/utils/TestDataMatchers.java | 246 +++++++++++++++++ .../test/common/utils/IteratorWithCurrentTest.java | 77 ++++++ .../test/common/utils/TestDataMatchersTest.java | 166 ++++++++++++ 4 files changed, 788 insertions(+) diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java new file mode 100644 index 0000000..4b90272 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/testsuites/SourceTestSuiteBase.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.testsuites; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.api.common.JobStatus; +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.connectors.test.common.environment.ClusterControllable; +import org.apache.flink.connectors.test.common.environment.TestEnvironment; +import org.apache.flink.connectors.test.common.external.ExternalContext; +import org.apache.flink.connectors.test.common.external.SourceSplitDataWriter; +import org.apache.flink.connectors.test.common.junit.extensions.ConnectorTestingExtension; +import org.apache.flink.connectors.test.common.junit.extensions.TestCaseInvocationContextProvider; +import org.apache.flink.connectors.test.common.junit.extensions.TestLoggerExtension; +import org.apache.flink.core.execution.JobClient; +import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperator; +import org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory; +import org.apache.flink.streaming.api.operators.collect.CollectStreamSink; +import org.apache.flink.util.CloseableIterator; + +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.TestInstance; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; + +import static org.apache.flink.connectors.test.common.utils.TestDataMatchers.matchesMultipleSplitTestData; +import static org.apache.flink.connectors.test.common.utils.TestDataMatchers.matchesSplitTestData; +import static org.hamcrest.MatcherAssert.assertThat; + +/** + * Base class for all test suites. + * + * <p>All cases should have well-descriptive JavaDoc, including: + * + * <ul> + * <li>What's the purpose of this case + * <li>Simple description of how this case works + * <li>Condition to fulfill in order to pass this case + * <li>Requirement of running this case + * </ul> + */ +@ExtendWith({ + ConnectorTestingExtension.class, + TestLoggerExtension.class, + TestCaseInvocationContextProvider.class +}) +@TestInstance(TestInstance.Lifecycle.PER_CLASS) +@Experimental +public abstract class SourceTestSuiteBase<T> { + + private static final Logger LOG = LoggerFactory.getLogger(SourceTestSuiteBase.class); + + // ----------------------------- Basic test cases --------------------------------- + + /** + * Test connector source with only one split in the external system. + * + * <p>This test will create one split in the external system, write test data into it, and + * consume back via a Flink job with 1 parallelism. + * + * <p>The number and order of records consumed by Flink need to be identical to the test data + * written to the external system in order to pass this test. + * + * <p>A bounded source is required for this test. + */ + @TestTemplate + @DisplayName("Test source with single split") + public void testSourceSingleSplit(TestEnvironment testEnv, ExternalContext<T> externalContext) + throws Exception { + + // Write test data to external system + final Collection<T> testRecords = generateAndWriteTestData(externalContext); + + // Build and execute Flink job + StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); + + try (CloseableIterator<T> resultIterator = + execEnv.fromSource( + externalContext.createSource(Boundedness.BOUNDED), + WatermarkStrategy.noWatermarks(), + "Tested Source") + .setParallelism(1) + .executeAndCollect("Source Single Split Test")) { + // Check test result + assertThat(resultIterator, matchesSplitTestData(testRecords)); + } + } + + /** + * Test connector source with multiple splits in the external system + * + * <p>This test will create 4 splits in the external system, write test data to all splits, and + * consume back via a Flink job with 4 parallelism. + * + * <p>The number and order of records in each split consumed by Flink need to be identical to + * the test data written into the external system to pass this test. There's no requirement for + * record order across splits. + * + * <p>A bounded source is required for this test. + */ + @TestTemplate + @DisplayName("Test source with multiple splits") + public void testMultipleSplits(TestEnvironment testEnv, ExternalContext<T> externalContext) + throws Exception { + + final int splitNumber = 4; + final List<Collection<T>> testRecordCollections = new ArrayList<>(); + for (int i = 0; i < splitNumber; i++) { + testRecordCollections.add(generateAndWriteTestData(externalContext)); + } + + LOG.debug("Build and execute Flink job"); + StreamExecutionEnvironment execEnv = testEnv.createExecutionEnvironment(); + + try (final CloseableIterator<T> resultIterator = + execEnv.fromSource( + externalContext.createSource(Boundedness.BOUNDED), + WatermarkStrategy.noWatermarks(), + "Tested Source") + .setParallelism(splitNumber) + .executeAndCollect("Source Multiple Split Test")) { + // Check test result + assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections)); + } + } + + /** + * Test connector source with an idle reader. + * + * <p>This test will create 4 split in the external system, write test data to all splits, and + * consume back via a Flink job with 5 parallelism, so at least one parallelism / source reader + * will be idle (assigned with no splits). If the split enumerator of the source doesn't signal + * NoMoreSplitsEvent to the idle source reader, the Flink job will never spin to FINISHED state. + * + * <p>The number and order of records in each split consumed by Flink need to be identical to + * the test data written into the external system to pass this test. There's no requirement for + * record order across splits. + * + * <p>A bounded source is required for this test. + */ + @TestTemplate + @DisplayName("Test source with at least one idle parallelism") + public void testIdleReader(TestEnvironment testEnv, ExternalContext<T> externalContext) + throws Exception { + + final int splitNumber = 4; + final List<Collection<T>> testRecordCollections = new ArrayList<>(); + for (int i = 0; i < splitNumber; i++) { + testRecordCollections.add(generateAndWriteTestData(externalContext)); + } + + try (CloseableIterator<T> resultIterator = + testEnv.createExecutionEnvironment() + .fromSource( + externalContext.createSource(Boundedness.BOUNDED), + WatermarkStrategy.noWatermarks(), + "Tested Source") + .setParallelism(splitNumber + 1) + .executeAndCollect("Redundant Parallelism Test")) { + assertThat(resultIterator, matchesMultipleSplitTestData(testRecordCollections)); + } + } + + /** + * Test connector source with task manager failover. + * + * <p>This test will create 1 split in the external system, write test record set A into the + * split, restart task manager to trigger job failover, write test record set B into the split, + * and terminate the Flink job finally. + * + * <p>The number and order of records consumed by Flink should be identical to A before the + * failover and B after the failover in order to pass the test. + * + * <p>An unbounded source is required for this test, since TaskManager failover will be + * triggered in the middle of the test. + */ + @TestTemplate + @DisplayName("Test TaskManager failure") + public void testTaskManagerFailure( + TestEnvironment testEnv, + ExternalContext<T> externalContext, + ClusterControllable controller) + throws Exception { + + final Collection<T> testRecordsBeforeFailure = + externalContext.generateTestData(ThreadLocalRandom.current().nextLong()); + final SourceSplitDataWriter<T> sourceSplitDataWriter = + externalContext.createSourceSplitDataWriter(); + sourceSplitDataWriter.writeRecords(testRecordsBeforeFailure); + + final StreamExecutionEnvironment env = testEnv.createExecutionEnvironment(); + + env.enableCheckpointing(50); + final DataStreamSource<T> dataStreamSource = + env.fromSource( + externalContext.createSource(Boundedness.CONTINUOUS_UNBOUNDED), + WatermarkStrategy.noWatermarks(), + "Tested Source") + .setParallelism(1); + + // Since DataStream API doesn't expose job client for executeAndCollect(), we have + // to reuse these part of code to get both job client and result iterator :-( + // ------------------------------------ START --------------------------------------------- + TypeSerializer<T> serializer = dataStreamSource.getType().createSerializer(env.getConfig()); + String accumulatorName = "dataStreamCollect_" + UUID.randomUUID(); + CollectSinkOperatorFactory<T> factory = + new CollectSinkOperatorFactory<>(serializer, accumulatorName); + CollectSinkOperator<T> operator = (CollectSinkOperator<T>) factory.getOperator(); + CollectResultIterator<T> iterator = + new CollectResultIterator<>( + operator.getOperatorIdFuture(), + serializer, + accumulatorName, + env.getCheckpointConfig()); + CollectStreamSink<T> sink = new CollectStreamSink<>(dataStreamSource, factory); + sink.name("Data stream collect sink"); + env.addOperator(sink.getTransformation()); + final JobClient jobClient = env.executeAsync("TaskManager Failover Test"); + iterator.setJobClient(jobClient); + // -------------------------------------- END --------------------------------------------- + + assertThat( + iterator, + matchesSplitTestData(testRecordsBeforeFailure, testRecordsBeforeFailure.size())); + + // -------------------------------- Trigger failover --------------------------------------- + controller.triggerTaskManagerFailover(jobClient, () -> {}); + + CommonTestUtils.waitForJobStatus( + jobClient, + Collections.singletonList(JobStatus.RUNNING), + Deadline.fromNow(Duration.ofSeconds(30))); + + final Collection<T> testRecordsAfterFailure = + externalContext.generateTestData(ThreadLocalRandom.current().nextLong()); + sourceSplitDataWriter.writeRecords(testRecordsAfterFailure); + + assertThat( + iterator, + matchesSplitTestData(testRecordsAfterFailure, testRecordsAfterFailure.size())); + + // Clean up + iterator.close(); + CommonTestUtils.terminateJob(jobClient, Duration.ofSeconds(30)); + CommonTestUtils.waitForJobStatus( + jobClient, + Collections.singletonList(JobStatus.CANCELED), + Deadline.fromNow(Duration.ofSeconds(30))); + } + + // ----------------------------- Helper Functions --------------------------------- + + /** + * Generate a set of test records and write it to the given split writer. + * + * @param externalContext External context + * @return Collection of generated test records + */ + protected Collection<T> generateAndWriteTestData(ExternalContext<T> externalContext) { + final Collection<T> testRecordCollection = + externalContext.generateTestData(ThreadLocalRandom.current().nextLong()); + LOG.debug("Writing {} records to external system", testRecordCollection.size()); + externalContext.createSourceSplitDataWriter().writeRecords(testRecordCollection); + return testRecordCollection; + } +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java new file mode 100644 index 0000000..87d4905 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/main/java/org/apache/flink/connectors/test/common/utils/TestDataMatchers.java @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.utils; + +import org.apache.flink.annotation.Internal; + +import org.hamcrest.Description; +import org.hamcrest.TypeSafeDiagnosingMatcher; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.NoSuchElementException; + +/** Matchers for validating test data. */ +@Internal +public class TestDataMatchers { + + // ---------------------------- Matcher Builders ---------------------------------- + public static <T> MultipleSplitDataMatcher<T> matchesMultipleSplitTestData( + Collection<Collection<T>> testDataCollections) { + return new MultipleSplitDataMatcher<>(testDataCollections); + } + + public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testData) { + return new SplitDataMatcher<>(testData); + } + + public static <T> SplitDataMatcher<T> matchesSplitTestData(Collection<T> testData, int limit) { + return new SplitDataMatcher<>(testData, limit); + } + + // ---------------------------- Matcher Definitions -------------------------------- + + /** + * Matcher for validating test data in a single split. + * + * @param <T> Type of validating record + */ + public static class SplitDataMatcher<T> extends TypeSafeDiagnosingMatcher<Iterator<T>> { + private static final int UNSET = -1; + + private final Collection<T> testData; + private final int limit; + + private String mismatchDescription = null; + + public SplitDataMatcher(Collection<T> testData) { + this.testData = testData; + this.limit = UNSET; + } + + public SplitDataMatcher(Collection<T> testData, int limit) { + if (limit > testData.size()) { + throw new IllegalArgumentException( + "Limit validation size should be less than number of test records"); + } + this.testData = testData; + this.limit = limit; + } + + @Override + protected boolean matchesSafely(Iterator<T> resultIterator, Description description) { + if (mismatchDescription != null) { + description.appendText(mismatchDescription); + return false; + } + + int recordCounter = 0; + for (T testRecord : testData) { + if (!resultIterator.hasNext()) { + mismatchDescription = "Result data is less than test data"; + return false; + } + T resultRecord = resultIterator.next(); + if (!testRecord.equals(resultRecord)) { + mismatchDescription = + String.format( + "Mismatched record at position %d: Expected '%s' but was '%s'", + recordCounter, testRecord, resultRecord); + return false; + } + recordCounter++; + if (limit != UNSET && recordCounter >= limit) { + break; + } + } + if (limit == UNSET && resultIterator.hasNext()) { + mismatchDescription = "Result data is more than test data"; + return false; + } else { + return true; + } + } + + @Override + public void describeTo(Description description) { + description.appendText( + "Records consumed by Flink should be identical to test data " + + "and preserve the order in split"); + } + } + + /** + * Matcher for validating test data from multiple splits. + * + * <p>Each collection has a pointer (iterator) pointing to current checking record. When a + * record is received in the stream, it will be compared to all current pointing records in + * collections, and the pointer to the identical record will move forward. + * + * <p>If the stream preserves the correctness and order of records in all splits, all pointers + * should reach the end of the collection finally. + * + * @param <T> Type of validating record + */ + public static class MultipleSplitDataMatcher<T> extends TypeSafeDiagnosingMatcher<Iterator<T>> { + + private final List<IteratorWithCurrent<T>> testDataIterators = new ArrayList<>(); + + private String mismatchDescription = null; + + public MultipleSplitDataMatcher(Collection<Collection<T>> testDataCollections) { + for (Collection<T> testDataCollection : testDataCollections) { + testDataIterators.add(new IteratorWithCurrent<>(testDataCollection.iterator())); + } + } + + @Override + protected boolean matchesSafely(Iterator<T> resultIterator, Description description) { + if (mismatchDescription != null) { + description.appendText(mismatchDescription); + return false; + } + while (resultIterator.hasNext()) { + final T record = resultIterator.next(); + if (!matchThenNext(record)) { + mismatchDescription = String.format("Unexpected record '%s'", record); + return false; + } + } + if (!hasReachedEnd()) { + mismatchDescription = "Result data is less than test data"; + return false; + } else { + return true; + } + } + + @Override + public void describeTo(Description description) { + description.appendText( + "Records consumed by Flink should be identical to test data " + + "and preserve the order in multiple splits"); + } + + /** + * Check if any pointing data is identical to the record from the stream, and move the + * pointer to next record if matched. Otherwise throws an exception. + * + * @param record Record from stream + */ + private boolean matchThenNext(T record) { + for (IteratorWithCurrent<T> testDataIterator : testDataIterators) { + if (record.equals(testDataIterator.current())) { + testDataIterator.next(); + return true; + } + } + return false; + } + + /** + * Whether all pointers have reached the end of collections. + * + * @return True if all pointers have reached the end. + */ + private boolean hasReachedEnd() { + for (IteratorWithCurrent<T> testDataIterator : testDataIterators) { + if (testDataIterator.hasNext()) { + return false; + } + } + return true; + } + } + + /** + * An iterator wrapper which can access the element that the iterator is currently pointing to. + * + * @param <E> The type of elements returned by this iterator. + */ + static class IteratorWithCurrent<E> implements Iterator<E> { + + private final Iterator<E> originalIterator; + private E current; + + public IteratorWithCurrent(Iterator<E> originalIterator) { + this.originalIterator = originalIterator; + try { + current = originalIterator.next(); + } catch (NoSuchElementException e) { + current = null; + } + } + + @Override + public boolean hasNext() { + return current != null; + } + + @Override + public E next() { + if (current == null) { + throw new NoSuchElementException(); + } + E previous = current; + if (originalIterator.hasNext()) { + current = originalIterator.next(); + } else { + current = null; + } + return previous; + } + + public E current() { + return current; + } + } +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/IteratorWithCurrentTest.java b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/IteratorWithCurrentTest.java new file mode 100644 index 0000000..97a7b48 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/IteratorWithCurrentTest.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.utils; + +import org.apache.flink.connectors.test.common.utils.TestDataMatchers.IteratorWithCurrent; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.NoSuchElementException; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +/** Unit test for {@link IteratorWithCurrent}. */ +public class IteratorWithCurrentTest { + + @Test + public void testIterator() { + List<Integer> numberList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + numberList.add(i); + } + IteratorWithCurrent<Integer> iterator = new IteratorWithCurrent<>(numberList.iterator()); + Integer num = 0; + while (iterator.hasNext()) { + assertEquals(num, iterator.next()); + num++; + } + assertEquals(10, num.intValue()); + assertNull(iterator.current()); + } + + @Test + public void testEmptyList() { + IteratorWithCurrent<Integer> iterator = + new IteratorWithCurrent<>(Collections.emptyIterator()); + assertNull(iterator.current()); + assertThrows(NoSuchElementException.class, iterator::next); + } + + @Test + public void testCurrentElement() { + List<Integer> numberList = new ArrayList<>(); + for (int i = 0; i < 10; i++) { + numberList.add(i); + } + IteratorWithCurrent<Integer> iterator = new IteratorWithCurrent<>(numberList.iterator()); + Integer num = 0; + while (iterator.hasNext()) { + assertEquals(num, iterator.current()); + assertEquals(num, iterator.next()); + num++; + } + assertEquals(10, num.intValue()); + assertNull(iterator.current()); + } +} diff --git a/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java new file mode 100644 index 0000000..1833185 --- /dev/null +++ b/flink-test-utils-parent/flink-connector-testing/src/test/java/org/apache/flink/connectors/test/common/utils/TestDataMatchersTest.java @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.connectors.test.common.utils; + +import org.hamcrest.Description; +import org.hamcrest.StringDescription; +import org.hamcrest.TypeSafeDiagnosingMatcher; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertFalse; + +/** Unit test for {@link TestDataMatchers}. */ +public class TestDataMatchersTest { + @Nested + class SplitDataMatcherTest { + private final List<String> testData = Arrays.asList("alpha", "beta", "gamma"); + + @Test + public void testPositiveCases() { + assertThat(testData.iterator(), TestDataMatchers.matchesSplitTestData(testData)); + assertThat( + testData.iterator(), + TestDataMatchers.matchesSplitTestData(testData, testData.size())); + assertThat( + testData.iterator(), + TestDataMatchers.matchesSplitTestData(testData, testData.size() - 1)); + } + + @Test + public void testMismatch() throws Exception { + final List<String> resultData = new ArrayList<>(testData); + resultData.set(1, "delta"); + final Iterator<String> resultIterator = resultData.iterator(); + + final TestDataMatchers.SplitDataMatcher<String> matcher = + TestDataMatchers.matchesSplitTestData(testData); + + assertMatcherFailedWithDescription( + resultIterator, + matcher, + "Mismatched record at position 1: Expected 'beta' but was 'delta'"); + } + + @Test + public void testResultMoreThanExpected() throws Exception { + final List<String> resultData = new ArrayList<>(testData); + resultData.add("delta"); + final Iterator<String> resultIterator = resultData.iterator(); + + final TestDataMatchers.SplitDataMatcher<String> matcher = + TestDataMatchers.matchesSplitTestData(testData); + + assertMatcherFailedWithDescription( + resultIterator, matcher, "Result data is more than test data"); + } + + @Test + public void testResultLessThanExpected() throws Exception { + final List<String> resultData = new ArrayList<>(testData); + resultData.remove(testData.size() - 1); + final Iterator<String> resultIterator = resultData.iterator(); + + final TestDataMatchers.SplitDataMatcher<String> matcher = + TestDataMatchers.matchesSplitTestData(testData); + + assertMatcherFailedWithDescription( + resultIterator, matcher, "Result data is less than test data"); + } + } + + @Nested + class MultipleSplitDataMatcherTest { + private final List<String> splitA = Arrays.asList("alpha", "beta", "gamma"); + private final List<String> splitB = Arrays.asList("one", "two", "three"); + private final List<String> splitC = Arrays.asList("1", "2", "3"); + private final List<Collection<String>> testDataCollection = + Arrays.asList(splitA, splitB, splitC); + + @Test + public void testPositiveCase() { + final List<String> result = unionLists(splitA, splitB, splitC); + assertThat( + result.iterator(), + TestDataMatchers.matchesMultipleSplitTestData(testDataCollection)); + } + + @Test + public void testResultLessThanExpected() throws Exception { + final ArrayList<String> splitATestDataWithoutLast = new ArrayList<>(splitA); + splitATestDataWithoutLast.remove(splitA.size() - 1); + final List<String> result = unionLists(splitATestDataWithoutLast, splitB, splitC); + final TestDataMatchers.MultipleSplitDataMatcher<String> matcher = + TestDataMatchers.matchesMultipleSplitTestData(testDataCollection); + assertMatcherFailedWithDescription( + result.iterator(), matcher, "Result data is less than test data"); + } + + @Test + public void testResultMoreThanExpected() throws Exception { + final List<String> result = unionLists(splitA, splitB, splitC); + result.add("delta"); + final TestDataMatchers.MultipleSplitDataMatcher<String> matcher = + TestDataMatchers.matchesMultipleSplitTestData(testDataCollection); + assertMatcherFailedWithDescription( + result.iterator(), matcher, "Unexpected record 'delta'"); + } + + @Test + public void testOutOfOrder() throws Exception { + List<String> reverted = new ArrayList<>(splitC); + Collections.reverse(reverted); + final List<String> result = unionLists(splitA, splitB, reverted); + final TestDataMatchers.MultipleSplitDataMatcher<String> matcher = + TestDataMatchers.matchesMultipleSplitTestData(testDataCollection); + assertMatcherFailedWithDescription(result.iterator(), matcher, "Unexpected record '3'"); + } + } + + @SafeVarargs + private final <T> List<T> unionLists(List<T>... lists) { + return Stream.of(lists).flatMap(Collection::stream).collect(Collectors.toList()); + } + + private <T> void assertMatcherFailedWithDescription( + T object, TypeSafeDiagnosingMatcher<T> matcher, String expectedDescription) + throws Exception { + final Method method = + TypeSafeDiagnosingMatcher.class.getDeclaredMethod( + "matchesSafely", Object.class, Description.class); + method.setAccessible(true); + assertFalse((boolean) method.invoke(matcher, object, new Description.NullDescription())); + + final StringDescription actualDescription = new StringDescription(); + method.invoke(matcher, object, actualDescription); + Assertions.assertEquals(expectedDescription, actualDescription.toString()); + } +}
