This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 21283ee197d485352f1d9fcee0e889e6e0d3ecb2 Author: Piotr Nowojski <[email protected]> AuthorDate: Mon Sep 21 15:18:25 2020 +0200 [backport] Relevant test mock changes from "[FLINK-18907][task] Add test coverage for watermarks with chained sources" --- .../flink/api/connector/source/mocks/MockSource.java | 10 ++++++++-- .../flink/api/connector/source/mocks/MockSourceReader.java | 14 +++++++++++++- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java index c48b380..1c29147 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java @@ -40,12 +40,18 @@ public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSour private final Boundedness boundedness; private final int numSplits; - private List<MockSourceReader> createdReaders; + private final boolean readerWaitingForMoreSplits; + protected List<MockSourceReader> createdReaders; public MockSource(Boundedness boundedness, int numSplits) { + this(boundedness, numSplits, false); + } + + public MockSource(Boundedness boundedness, int numSplits, boolean readerWaitingForMoreSplits) { this.boundedness = boundedness; this.numSplits = numSplits; this.createdReaders = new ArrayList<>(); + this.readerWaitingForMoreSplits = readerWaitingForMoreSplits; } @Override @@ -55,7 +61,7 @@ public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSour @Override public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) { - MockSourceReader mockSourceReader = new MockSourceReader(); + MockSourceReader mockSourceReader = new MockSourceReader(readerWaitingForMoreSplits); createdReaders.add(mockSourceReader); return mockSourceReader; } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index d474e1f..58ae951 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -39,14 +39,19 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> private int currentSplitIndex = 0; private boolean started; private boolean closed; + private boolean waitingForMoreSplits; @GuardedBy("this") private CompletableFuture<Void> availableFuture; public MockSourceReader() { + this(false); + } + public MockSourceReader(boolean waitingForMoreSplits) { this.started = false; this.closed = false; this.availableFuture = CompletableFuture.completedFuture(null); + this.waitingForMoreSplits = waitingForMoreSplits; } @Override @@ -56,7 +61,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> @Override public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws Exception { - boolean finished = true; + boolean finished = !waitingForMoreSplits; currentSplitIndex = 0; // Find first splits with available records. while (currentSplitIndex < assignedSplits.size() @@ -96,6 +101,10 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> @Override public void handleSourceEvents(SourceEvent sourceEvent) { + if (sourceEvent instanceof MockNoMoreSplitsEvent) { + waitingForMoreSplits = false; + markAvailable(); + } receivedSourceEvents.add(sourceEvent); } @@ -139,4 +148,7 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> public List<SourceEvent> getReceivedSourceEvents() { return receivedSourceEvents; } + + public static class MockNoMoreSplitsEvent implements SourceEvent { + } }
