This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit bb234caa97eb49c341bb00831bb4e8e1e61b366a
Author: Piotr Nowojski <[email protected]>
AuthorDate: Tue Sep 22 14:14:52 2020 +0200

    [backport] Relevant test mock changes from "[FLINK-18907][test] Add stream 
status forwarding test for chained sources"
---
 .../org/apache/flink/api/connector/source/mocks/MockSource.java  | 8 +++++---
 .../flink/api/connector/source/mocks/MockSourceReader.java       | 9 +++++++--
 2 files changed, 12 insertions(+), 5 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
index 1c29147..b860f27 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -41,17 +41,19 @@ public class MockSource implements Source<Integer, 
MockSourceSplit, Set<MockSour
        private final Boundedness boundedness;
        private final int numSplits;
        private final boolean readerWaitingForMoreSplits;
+       private final boolean readerMarkIdleOnNoSplits;
        protected List<MockSourceReader> createdReaders;
 
        public MockSource(Boundedness boundedness, int numSplits) {
-               this(boundedness, numSplits, false);
+               this(boundedness, numSplits, false, false);
        }
 
-       public MockSource(Boundedness boundedness, int numSplits, boolean 
readerWaitingForMoreSplits) {
+       public MockSource(Boundedness boundedness, int numSplits, boolean 
readerWaitingForMoreSplits, boolean readerMarkIdleOnNoSplits) {
                this.boundedness = boundedness;
                this.numSplits = numSplits;
                this.createdReaders = new ArrayList<>();
                this.readerWaitingForMoreSplits = readerWaitingForMoreSplits;
+               this.readerMarkIdleOnNoSplits = readerMarkIdleOnNoSplits;
        }
 
        @Override
@@ -61,7 +63,7 @@ public class MockSource implements Source<Integer, 
MockSourceSplit, Set<MockSour
 
        @Override
        public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
-               MockSourceReader mockSourceReader = new 
MockSourceReader(readerWaitingForMoreSplits);
+               MockSourceReader mockSourceReader = new 
MockSourceReader(readerWaitingForMoreSplits, readerMarkIdleOnNoSplits);
                createdReaders.add(mockSourceReader);
                return mockSourceReader;
        }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index 4910f74..e48f827 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -35,6 +35,7 @@ import java.util.concurrent.CompletableFuture;
 public class MockSourceReader implements SourceReader<Integer, 
MockSourceSplit> {
        private final List<MockSourceSplit> assignedSplits = new ArrayList<>();
        private final List<SourceEvent> receivedSourceEvents = new 
ArrayList<>();
+       private final boolean markIdleOnNoSplits;
 
        private int currentSplitIndex = 0;
        private boolean started;
@@ -45,14 +46,15 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        private CompletableFuture<Void> availableFuture;
 
        public MockSourceReader() {
-               this(false);
+               this(false, false);
        }
 
-       public MockSourceReader(boolean waitingForMoreSplits) {
+       public MockSourceReader(boolean waitingForMoreSplits, boolean 
markIdleOnNoSplits) {
                this.started = false;
                this.closed = false;
                this.availableFuture = CompletableFuture.completedFuture(null);
                this.waitingForMoreSplits = waitingForMoreSplits;
+               this.markIdleOnNoSplits = markIdleOnNoSplits;
        }
 
        @Override
@@ -79,6 +81,9 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                        return InputStatus.END_OF_INPUT;
                }
                else {
+                       if (markIdleOnNoSplits) {
+                               sourceOutput.markIdle();
+                       }
                        markUnavailable();
                        return InputStatus.NOTHING_AVAILABLE;
                }

Reply via email to