This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b0cacf9719320a07cb36e1bafeea9c9ae3409cd
Author: Stephan Ewen <[email protected]>
AuthorDate: Wed Nov 4 22:22:30 2020 +0100

    [FLINK-19265][core] Add to source coordinator built-in methods to signal 
"no more splits".
    
    This replaces custom events and event handling implemented by many sources.
---
 .../base/source/reader/SourceReaderBase.java       | 17 ++++++++--------
 .../base/source/reader/SourceReaderBaseTest.java   |  9 ++++-----
 .../source/reader/mocks/MockSplitEnumerator.java   |  3 +--
 .../flink/api/connector/source/SourceReader.java   | 14 ++++++++++++-
 .../connector/source/SplitEnumeratorContext.java   |  8 ++++++++
 .../source/lib/util/IteratorSourceEnumerator.java  |  3 +--
 .../source/lib/util/IteratorSourceReader.java      | 23 ++++++++++------------
 .../connector/source/mocks/MockSourceReader.java   | 20 ++++++++-----------
 .../coordinator/SourceCoordinatorContext.java      | 14 +++++++++++++
 .../runtime}/source/event/NoMoreSplitsEvent.java   |  6 +++---
 .../streaming/api/operators/SourceOperator.java    |  3 +++
 .../source/SourceOperatorEventTimeTest.java        |  3 +--
 12 files changed, 74 insertions(+), 49 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 503e6c9..430f629 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -220,13 +219,15 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
        }
 
        @Override
+       public void notifyNoMoreSplits() {
+               LOG.info("Reader received NoMoreSplits event.");
+               noMoreSplitsAssignment = true;
+               elementsQueue.notifyAvailable();
+       }
+
+       @Override
        public void handleSourceEvents(SourceEvent sourceEvent) {
-               LOG.trace("Handling source event: {}", sourceEvent);
-               if (sourceEvent instanceof NoMoreSplitsEvent) {
-                       LOG.info("Reader received NoMoreSplits event.");
-                       noMoreSplitsAssignment = true;
-                       elementsQueue.notifyAvailable();
-               }
+               LOG.info("Received unhandled source event: {}", sourceEvent);
        }
 
        @Override
@@ -235,8 +236,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                splitFetcherManager.close(options.sourceReaderCloseTimeout);
        }
 
-
-
        // -------------------- Abstract method to allow different 
implementations ------------------
        /**
         * Handles the finished splits to clean the state if needed.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index 3efe222..764b203 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.connector.base.source.reader;
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
 import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
@@ -95,7 +94,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        reader.addSplits(Collections.singletonList(getSplit(0,
                                NUM_RECORDS_PER_SPLIT,
                                Boundedness.CONTINUOUS_UNBOUNDED)));
-                       reader.handleSourceEvents(new NoMoreSplitsEvent());
+                       reader.notifyNoMoreSplits();
                        // This is not a real infinite loop, it is supposed to 
throw exception after two polls.
                        while (true) {
                                InputStatus inputStatus = 
reader.pollNext(output);
@@ -151,7 +150,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        getSplit(1, 12, Boundedness.BOUNDED)
                );
                reader.addSplits(splits);
-               reader.handleSourceEvents(new NoMoreSplitsEvent());
+               reader.notifyNoMoreSplits();
 
                while (true) {
                        InputStatus status = reader.pollNext(new 
TestingReaderOutput<>());
@@ -185,7 +184,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        getSplit(1, 10, Boundedness.BOUNDED)
                );
                reader.addSplits(splits);
-               reader.handleSourceEvents(new NoMoreSplitsEvent());
+               reader.notifyNoMoreSplits();
 
                while (true) {
                        InputStatus status = reader.pollNext(new 
TestingReaderOutput<>());
@@ -216,7 +215,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                // Create and add a split that only contains one record
                final MockSourceSplit split = new MockSourceSplit(0, 0, 1);
                sourceReader.addSplits(Collections.singletonList(split));
-               sourceReader.handleSourceEvents(new NoMoreSplitsEvent());
+               sourceReader.notifyNoMoreSplits();
 
                // Add the last record to the split when the 
splitFetcherManager shutting down SplitFetchers
                
splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> 
split.addRecord(1));
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 8df3394..77da1ba 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 
 import java.io.IOException;
@@ -73,7 +72,7 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Lis
                        context.assignSplits(new 
SplitsAssignment<>(assignment));
                        splits.clear();
                        for (int i = 0; i < numReaders; i++) {
-                               context.sendEventToSourceReader(i, new 
NoMoreSplitsEvent());
+                               context.signalNoMoreSplits(i);
                        }
                }
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 0aac1b1..6dad2d5 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -74,11 +74,23 @@ public interface SourceReader<T, SplitT extends SourceSplit>
        void addSplits(List<SplitT> splits);
 
        /**
+        * This method is called when the reader is notified that it will not
+        * receive any further splits.
+        *
+        * <p>It is triggered when the enumerator calls {@link 
SplitEnumeratorContext#signalNoMoreSplits(int)}
+        * with the reader's parallel subtask.
+        */
+       void notifyNoMoreSplits();
+
+       /**
         * Handle a source event sent by the {@link SplitEnumerator}.
         *
+        * <p>This method has a default implementation that does nothing, 
because
+        * most sources do not require any custom events.
+        *
         * @param sourceEvent the event sent by the {@link SplitEnumerator}.
         */
-       void handleSourceEvents(SourceEvent sourceEvent);
+       default void handleSourceEvents(SourceEvent sourceEvent) {}
 
        /**
         * We have an empty default implementation here because most source 
readers do not have
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index dbf8308..9ce47ef 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -84,6 +84,14 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
        }
 
        /**
+        * Signals a subtask that it will not receive any further split.
+        *
+        * @param subtask The index of the operator's parallel subtask that 
shall be
+        *                signaled it will not receive any further split.
+        */
+       void signalNoMoreSplits(int subtask);
+
+       /**
         * Invoke the callable and handover the return value to the handler 
which will be executed
         * by the source coordinator. When this method is invoked multiple 
times, The <code>Coallble</code>s
         * may be executed in a thread pool concurrently.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 91bc283..61e806d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -66,7 +65,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
                if (nextSplit != null) {
                        context.assignSplit(nextSplit, subtaskId);
                } else {
-                       context.sendEventToSourceReader(subtaskId, new 
NoMoreSplitsEvent());
+                       context.signalNoMoreSplits(subtaskId);
                }
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index 3af75e1..5c6631e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -19,13 +19,10 @@
 package org.apache.flink.api.connector.source.lib.util;
 
 import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nullable;
 
@@ -122,6 +119,16 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        }
 
        @Override
+       public void notifyNoMoreSplits() {
+               // if we get this after we already had a split, we must have 
requested more than
+               // one split, which is not expected here.
+               checkState(remainingSplits == null, "Unexpected response, 
requested more than one split.");
+
+               // non-null queue signals splits were assigned, in this case no 
splits
+               remainingSplits = new ArrayDeque<>();
+       }
+
+       @Override
        public List<SplitT> snapshotState(long checkpointId) {
                final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
                if (iterator != null) {
@@ -134,15 +141,5 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        }
 
        @Override
-       public void handleSourceEvents(SourceEvent sourceEvent) {
-               if (sourceEvent instanceof NoMoreSplitsEvent) {
-                       // non-null queue signals splits were assigned, in this 
case no splits
-                       remainingSplits = new ArrayDeque<>();
-               } else {
-                       throw new FlinkRuntimeException("Unexpected event: " + 
sourceEvent);
-               }
-       }
-
-       @Override
        public void close() throws Exception {}
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index b0a24dd..8eb4596 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -108,12 +108,9 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        }
 
        @Override
-       public void handleSourceEvents(SourceEvent sourceEvent) {
-               if (sourceEvent instanceof MockNoMoreSplitsEvent) {
-                       waitingForMoreSplits = false;
-                       markAvailable();
-               }
-               receivedSourceEvents.add(sourceEvent);
+       public void notifyNoMoreSplits() {
+               waitingForMoreSplits = false;
+               markAvailable();
        }
 
        @Override
@@ -137,6 +134,11 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                }
        }
 
+       @Override
+       public void handleSourceEvents(SourceEvent sourceEvent) {
+               receivedSourceEvents.add(sourceEvent);
+       }
+
        // --------------- methods for unit tests ---------------
 
        public void markAvailable() {
@@ -174,10 +176,4 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        public List<Long> getAbortedCheckpoints() {
                return abortedCheckpoints;
        }
-
-       /**
-        * Simple event allowing {@link MockSourceReader} to finish when 
requested.
-        */
-       public static class MockNoMoreSplitsEvent implements SourceEvent {
-       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 062e2d4..6141688 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.Preconditions;
@@ -184,6 +185,19 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
        }
 
        @Override
+       public void signalNoMoreSplits(int subtask) {
+               // Ensure the split assignment is done by the the coordinator 
executor.
+               callInCoordinatorThread(() -> {
+                       try {
+                               operatorCoordinatorContext.sendEvent(new 
NoMoreSplitsEvent(), subtask);
+                               return null; // void return value
+                       } catch (TaskNotRunningException e) {
+                               throw new FlinkRuntimeException("Failed to send 
'NoMoreSplits' to reader " + subtask, e);
+                       }
+               }, "Failed to send 'NoMoreSplits' to reader " + subtask);
+       }
+
+       @Override
        public <T> void callAsync(
                        Callable<T> callable,
                        BiConsumer<T, Throwable> handler,
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
similarity index 87%
rename from 
flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
index aa26808..18d8433 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.source.event;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
  * A source event sent from the SplitEnumerator to the SourceReader to 
indicate that no more
  * splits will be assigned to the source reader anymore. So once the 
SplitReader finishes
  * reading the currently assigned splits, they can exit.
  */
-public class NoMoreSplitsEvent implements SourceEvent {
+public class NoMoreSplitsEvent implements OperatorEvent {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 799b2a1..bfe51e2 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -264,6 +265,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
                        }
                } else if (event instanceof SourceEventWrapper) {
                        sourceReader.handleSourceEvents(((SourceEventWrapper) 
event).getSourceEvent());
+               } else if (event instanceof NoMoreSplitsEvent) {
+                       sourceReader.notifyNoMoreSplits();
                } else {
                        throw new IllegalStateException("Received unexpected 
operator event " + event);
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index d48ed13..d47d309 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -252,7 +251,7 @@ public class SourceOperatorEventTimeTest {
                public void addSplits(List<MockSourceSplit> splits) {}
 
                @Override
-               public void handleSourceEvents(SourceEvent sourceEvent) {}
+               public void notifyNoMoreSplits() {}
 
                @Override
                public void close() {}

Reply via email to