This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 21283ee197d485352f1d9fcee0e889e6e0d3ecb2
Author: Piotr Nowojski <[email protected]>
AuthorDate: Mon Sep 21 15:18:25 2020 +0200

    [backport] Relevant test mock changes from "[FLINK-18907][task] Add test 
coverage for watermarks with chained sources"
---
 .../flink/api/connector/source/mocks/MockSource.java       | 10 ++++++++--
 .../flink/api/connector/source/mocks/MockSourceReader.java | 14 +++++++++++++-
 2 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
index c48b380..1c29147 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSource.java
@@ -40,12 +40,18 @@ public class MockSource implements Source<Integer, 
MockSourceSplit, Set<MockSour
 
        private final Boundedness boundedness;
        private final int numSplits;
-       private List<MockSourceReader> createdReaders;
+       private final boolean readerWaitingForMoreSplits;
+       protected List<MockSourceReader> createdReaders;
 
        public MockSource(Boundedness boundedness, int numSplits) {
+               this(boundedness, numSplits, false);
+       }
+
+       public MockSource(Boundedness boundedness, int numSplits, boolean 
readerWaitingForMoreSplits) {
                this.boundedness = boundedness;
                this.numSplits = numSplits;
                this.createdReaders = new ArrayList<>();
+               this.readerWaitingForMoreSplits = readerWaitingForMoreSplits;
        }
 
        @Override
@@ -55,7 +61,7 @@ public class MockSource implements Source<Integer, 
MockSourceSplit, Set<MockSour
 
        @Override
        public SourceReader<Integer, MockSourceSplit> 
createReader(SourceReaderContext readerContext) {
-               MockSourceReader mockSourceReader = new MockSourceReader();
+               MockSourceReader mockSourceReader = new 
MockSourceReader(readerWaitingForMoreSplits);
                createdReaders.add(mockSourceReader);
                return mockSourceReader;
        }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index d474e1f..58ae951 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -39,14 +39,19 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        private int currentSplitIndex = 0;
        private boolean started;
        private boolean closed;
+       private boolean waitingForMoreSplits;
 
        @GuardedBy("this")
        private CompletableFuture<Void> availableFuture;
 
        public MockSourceReader() {
+               this(false);
+       }
+       public MockSourceReader(boolean waitingForMoreSplits) {
                this.started = false;
                this.closed = false;
                this.availableFuture = CompletableFuture.completedFuture(null);
+               this.waitingForMoreSplits = waitingForMoreSplits;
        }
 
        @Override
@@ -56,7 +61,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
 
        @Override
        public InputStatus pollNext(ReaderOutput<Integer> sourceOutput) throws 
Exception {
-               boolean finished = true;
+               boolean finished = !waitingForMoreSplits;
                currentSplitIndex = 0;
                // Find first splits with available records.
                while (currentSplitIndex < assignedSplits.size()
@@ -96,6 +101,10 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
 
        @Override
        public void handleSourceEvents(SourceEvent sourceEvent) {
+               if (sourceEvent instanceof MockNoMoreSplitsEvent) {
+                       waitingForMoreSplits = false;
+                       markAvailable();
+               }
                receivedSourceEvents.add(sourceEvent);
        }
 
@@ -139,4 +148,7 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        public List<SourceEvent> getReceivedSourceEvents() {
                return receivedSourceEvents;
        }
+
+       public static class MockNoMoreSplitsEvent implements SourceEvent {
+       }
 }

Reply via email to