This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb234caa97eb49c341bb00831bb4e8e1e61b366a Author: Piotr Nowojski <[email protected]> AuthorDate: Tue Sep 22 14:14:52 2020 +0200 [backport] Relevant test mock changes from "[FLINK-18907][test] Add stream status forwarding test for chained sources" --- .../org/apache/flink/api/connector/source/mocks/MockSource.java | 8 +++++--- .../flink/api/connector/source/mocks/MockSourceReader.java | 9 +++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java index 1c29147..b860f27 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java @@ -41,17 +41,19 @@ public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSour private final Boundedness boundedness; private final int numSplits; private final boolean readerWaitingForMoreSplits; + private final boolean readerMarkIdleOnNoSplits; protected List<MockSourceReader> createdReaders; public MockSource(Boundedness boundedness, int numSplits) { - this(boundedness, numSplits, false); + this(boundedness, numSplits, false, false); } - public MockSource(Boundedness boundedness, int numSplits, boolean readerWaitingForMoreSplits) { + public MockSource(Boundedness boundedness, int numSplits, boolean readerWaitingForMoreSplits, boolean readerMarkIdleOnNoSplits) { this.boundedness = boundedness; this.numSplits = numSplits; this.createdReaders = new ArrayList<>(); this.readerWaitingForMoreSplits = readerWaitingForMoreSplits; + this.readerMarkIdleOnNoSplits = readerMarkIdleOnNoSplits; } @Override @@ -61,7 +63,7 @@ public class MockSource implements Source<Integer, MockSourceSplit, Set<MockSour @Override public SourceReader<Integer, MockSourceSplit> createReader(SourceReaderContext readerContext) { - MockSourceReader mockSourceReader = new MockSourceReader(readerWaitingForMoreSplits); + MockSourceReader mockSourceReader = new MockSourceReader(readerWaitingForMoreSplits, readerMarkIdleOnNoSplits); createdReaders.add(mockSourceReader); return mockSourceReader; } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index 4910f74..e48f827 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture; public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> { private final List<MockSourceSplit> assignedSplits = new ArrayList<>(); private final List<SourceEvent> receivedSourceEvents = new ArrayList<>(); + private final boolean markIdleOnNoSplits; private int currentSplitIndex = 0; private boolean started; @@ -45,14 +46,15 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> private CompletableFuture<Void> availableFuture; public MockSourceReader() { - this(false); + this(false, false); } - public MockSourceReader(boolean waitingForMoreSplits) { + public MockSourceReader(boolean waitingForMoreSplits, boolean markIdleOnNoSplits) { this.started = false; this.closed = false; this.availableFuture = CompletableFuture.completedFuture(null); this.waitingForMoreSplits = waitingForMoreSplits; + this.markIdleOnNoSplits = markIdleOnNoSplits; } @Override @@ -79,6 +81,9 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> return InputStatus.END_OF_INPUT; } else { + if (markIdleOnNoSplits) { + sourceOutput.markIdle(); + } markUnavailable(); return InputStatus.NOTHING_AVAILABLE; }
