This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36498a0b767a53c0782cc4fafeb62bdb3211b0b4
Author: Stephan Ewen <[email protected]>
AuthorDate: Wed Nov 4 22:22:30 2020 +0100

    [FLINK-19265][core] Add to source coordinator built-in methods to signal 
"no more splits".
    
    This replaces custom events and event handling implemented by many sources.
---
 .../base/source/reader/SourceReaderBase.java       | 17 ++++++++--------
 .../base/source/reader/SourceReaderBaseTest.java   |  3 +--
 .../source/reader/mocks/MockSplitEnumerator.java   |  3 +--
 .../file/src/impl/StaticFileSplitEnumerator.java   |  3 +--
 .../file/src/FileSourceHeavyThroughputTest.java    |  3 +--
 .../source/enumerator/KafkaSourceEnumerator.java   |  3 +--
 .../flink/api/connector/source/SourceReader.java   | 14 ++++++++++++-
 .../connector/source/SplitEnumeratorContext.java   |  8 ++++++++
 .../source/lib/util/IteratorSourceEnumerator.java  |  3 +--
 .../source/lib/util/IteratorSourceReader.java      | 23 ++++++++++------------
 .../connector/source/mocks/MockSourceReader.java   | 20 ++++++++-----------
 .../source/mocks/MockSplitEnumeratorContext.java   |  3 +++
 .../coordinator/SourceCoordinatorContext.java      | 14 +++++++++++++
 .../runtime}/source/event/NoMoreSplitsEvent.java   |  6 +++---
 .../streaming/api/operators/SourceOperator.java    |  3 +++
 .../source/SourceOperatorEventTimeTest.java        |  3 +--
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  6 +++---
 .../checkpointing/UnalignedCheckpointITCase.java   |  3 +--
 18 files changed, 81 insertions(+), 57 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
index 5b4d6f7..b8dc9ef 100644
--- 
a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
+++ 
b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.SourceSplit;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.configuration.Configuration;
 import 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager;
 import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
@@ -220,13 +219,15 @@ public abstract class SourceReaderBase<E, T, SplitT 
extends SourceSplit, SplitSt
        }
 
        @Override
+       public void notifyNoMoreSplits() {
+               LOG.info("Reader received NoMoreSplits event.");
+               noMoreSplitsAssignment = true;
+               elementsQueue.notifyAvailable();
+       }
+
+       @Override
        public void handleSourceEvents(SourceEvent sourceEvent) {
-               LOG.trace("Handling source event: {}", sourceEvent);
-               if (sourceEvent instanceof NoMoreSplitsEvent) {
-                       LOG.info("Reader received NoMoreSplits event.");
-                       noMoreSplitsAssignment = true;
-                       elementsQueue.notifyAvailable();
-               }
+               LOG.info("Received unhandled source event: {}", sourceEvent);
        }
 
        @Override
@@ -235,8 +236,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends 
SourceSplit, SplitSt
                splitFetcherManager.close(options.sourceReaderCloseTimeout);
        }
 
-
-
        // -------------------- Abstract method to allow different 
implementations ------------------
        /**
         * Handles the finished splits to clean the state if needed.
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
index ff38f07..5c8d9d3 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.connector.base.source.reader;
 
 import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.connector.source.SourceReader;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.TestingReaderContext;
 import org.apache.flink.api.connector.source.mocks.TestingReaderOutput;
@@ -90,7 +89,7 @@ public class SourceReaderBaseTest extends 
SourceReaderTestBase<MockSourceSplit>
                        reader.addSplits(Collections.singletonList(getSplit(0,
                                NUM_RECORDS_PER_SPLIT,
                                Boundedness.CONTINUOUS_UNBOUNDED)));
-                       reader.handleSourceEvents(new NoMoreSplitsEvent());
+                       reader.notifyNoMoreSplits();
                        // This is not a real infinite loop, it is supposed to 
throw exception after two polls.
                        while (true) {
                                InputStatus inputStatus = 
reader.pollNext(output);
diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 8df3394..77da1ba 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 
 import java.io.IOException;
@@ -73,7 +72,7 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Lis
                        context.assignSplits(new 
SplitsAssignment<>(assignment));
                        splits.clear();
                        for (int i = 0; i < numReaders; i++) {
-                               context.sendEventToSourceReader(i, new 
NoMoreSplitsEvent());
+                               context.signalNoMoreSplits(i);
                        }
                }
        }
diff --git 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
index f194dc4..de10682 100644
--- 
a/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/src/impl/StaticFileSplitEnumerator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.connector.file.src.FileSource;
 import org.apache.flink.connector.file.src.FileSourceSplit;
@@ -121,7 +120,7 @@ public class StaticFileSplitEnumerator implements 
SplitEnumerator<FileSourceSpli
                        LOG.info("Assigned split to subtask {} : {}", subtask, 
split);
                }
                else {
-                       context.sendEventToSourceReader(subtask, new 
NoMoreSplitsEvent());
+                       context.signalNoMoreSplits(subtask);
                        LOG.info("No more splits available for subtask {}", 
subtask);
                }
        }
diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
index b54c7f3..a789605 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/src/FileSourceHeavyThroughputTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.file.src.reader.SimpleStreamFormat;
 import org.apache.flink.connector.file.src.reader.StreamFormat;
@@ -81,7 +80,7 @@ public class FileSourceHeavyThroughputTest {
                final FileSource<byte[]> source = 
FileSource.forRecordStreamFormat(new ArrayReaderFormat(), path).build();
                final SourceReader<byte[], FileSourceSplit> reader = 
source.createReader(new NoOpReaderContext());
                reader.addSplits(Collections.singletonList(split));
-               reader.handleSourceEvents(new NoMoreSplitsEvent());
+               reader.notifyNoMoreSplits();
 
                final ReaderOutput<byte[]> out = new NoOpReaderOutput<>();
 
diff --git 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
index 02164e2..b3c6888 100644
--- 
a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
+++ 
b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/enumerator/KafkaSourceEnumerator.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import 
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
 import 
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscriber;
@@ -242,7 +241,7 @@ public class KafkaSourceEnumerator implements 
SplitEnumerator<KafkaPartitionSpli
                        if (noMoreNewPartitionSplits) {
                                LOG.debug("No more KafkaPartitionSplits to 
assign. Sending NoMoreSplitsEvent to the readers " +
                                        "in consumer group {}.", 
consumerGroupId);
-                               context.sendEventToSourceReader(readerOwner, 
new NoMoreSplitsEvent());
+                               context.signalNoMoreSplits(readerOwner);
                        }
                });
        }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
index 0d37689..1258357 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java
@@ -94,13 +94,25 @@ public interface SourceReader<T, SplitT extends SourceSplit>
        void addSplits(List<SplitT> splits);
 
        /**
+        * This method is called when the reader is notified that it will not
+        * receive any further splits.
+        *
+        * <p>It is triggered when the enumerator calls {@link 
SplitEnumeratorContext#signalNoMoreSplits(int)}
+        * with the reader's parallel subtask.
+        */
+       void notifyNoMoreSplits();
+
+       /**
         * Handle a custom source event sent by the {@link SplitEnumerator}.
         * This method is called when the enumerator sends an event via
         * {@link SplitEnumeratorContext#sendEventToSourceReader(int, 
SourceEvent)}.
         *
+        * <p>This method has a default implementation that does nothing, 
because
+        * most sources do not require any custom events.
+        *
         * @param sourceEvent the event sent by the {@link SplitEnumerator}.
         */
-       void handleSourceEvents(SourceEvent sourceEvent);
+       default void handleSourceEvents(SourceEvent sourceEvent) {}
 
        /**
         * We have an empty default implementation here because most source 
readers do not have
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index 535501a..efd3627 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -84,6 +84,14 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
        }
 
        /**
+        * Signals a subtask that it will not receive any further split.
+        *
+        * @param subtask The index of the operator's parallel subtask that 
shall be
+        *                signaled it will not receive any further split.
+        */
+       void signalNoMoreSplits(int subtask);
+
+       /**
         * Invoke the callable and handover the return value to the handler 
which will be executed
         * by the source coordinator. When this method is invoked multiple 
times, The <code>Coallble</code>s
         * may be executed in a thread pool concurrently.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 91bc283..61e806d 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util;
 import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -66,7 +65,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
                if (nextSplit != null) {
                        context.assignSplit(nextSplit, subtaskId);
                } else {
-                       context.sendEventToSourceReader(subtaskId, new 
NoMoreSplitsEvent());
+                       context.signalNoMoreSplits(subtaskId);
                }
        }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index 3af75e1..5c6631e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -19,13 +19,10 @@
 package org.apache.flink.api.connector.source.lib.util;
 
 import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.core.io.InputStatus;
-import org.apache.flink.util.FlinkRuntimeException;
 
 import javax.annotation.Nullable;
 
@@ -122,6 +119,16 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        }
 
        @Override
+       public void notifyNoMoreSplits() {
+               // if we get this after we already had a split, we must have 
requested more than
+               // one split, which is not expected here.
+               checkState(remainingSplits == null, "Unexpected response, 
requested more than one split.");
+
+               // non-null queue signals splits were assigned, in this case no 
splits
+               remainingSplits = new ArrayDeque<>();
+       }
+
+       @Override
        public List<SplitT> snapshotState(long checkpointId) {
                final ArrayList<SplitT> allSplits = new ArrayList<>(1 + 
remainingSplits.size());
                if (iterator != null) {
@@ -134,15 +141,5 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        }
 
        @Override
-       public void handleSourceEvents(SourceEvent sourceEvent) {
-               if (sourceEvent instanceof NoMoreSplitsEvent) {
-                       // non-null queue signals splits were assigned, in this 
case no splits
-                       remainingSplits = new ArrayDeque<>();
-               } else {
-                       throw new FlinkRuntimeException("Unexpected event: " + 
sourceEvent);
-               }
-       }
-
-       @Override
        public void close() throws Exception {}
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
index b0a24dd..8eb4596 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java
@@ -108,12 +108,9 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        }
 
        @Override
-       public void handleSourceEvents(SourceEvent sourceEvent) {
-               if (sourceEvent instanceof MockNoMoreSplitsEvent) {
-                       waitingForMoreSplits = false;
-                       markAvailable();
-               }
-               receivedSourceEvents.add(sourceEvent);
+       public void notifyNoMoreSplits() {
+               waitingForMoreSplits = false;
+               markAvailable();
        }
 
        @Override
@@ -137,6 +134,11 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
                }
        }
 
+       @Override
+       public void handleSourceEvents(SourceEvent sourceEvent) {
+               receivedSourceEvents.add(sourceEvent);
+       }
+
        // --------------- methods for unit tests ---------------
 
        public void markAvailable() {
@@ -174,10 +176,4 @@ public class MockSourceReader implements 
SourceReader<Integer, MockSourceSplit>
        public List<Long> getAbortedCheckpoints() {
                return abortedCheckpoints;
        }
-
-       /**
-        * Simple event allowing {@link MockSourceReader} to finish when 
requested.
-        */
-       public static class MockNoMoreSplitsEvent implements SourceEvent {
-       }
 }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
index 6b2ba73..19f9ccc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumeratorContext.java
@@ -113,6 +113,9 @@ public class MockSplitEnumeratorContext<SplitT extends 
SourceSplit> implements S
        }
 
        @Override
+       public void signalNoMoreSplits(int subtask) {}
+
+       @Override
        public <T> void callAsync(Callable<T> callable, BiConsumer<T, 
Throwable> handler) {
                if (stoppedAcceptAsyncCalls.get()) {
                        return;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index 50b50bb..b8e78db 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.TaskNotRunningException;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -183,6 +184,19 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
        }
 
        @Override
+       public void signalNoMoreSplits(int subtask) {
+               // Ensure the split assignment is done by the the coordinator 
executor.
+               callInCoordinatorThread(() -> {
+                       try {
+                               operatorCoordinatorContext.sendEvent(new 
NoMoreSplitsEvent(), subtask);
+                               return null; // void return value
+                       } catch (TaskNotRunningException e) {
+                               throw new FlinkRuntimeException("Failed to send 
'NoMoreSplits' to reader " + subtask, e);
+                       }
+               }, "Failed to send 'NoMoreSplits' to reader " + subtask);
+       }
+
+       @Override
        public <T> void callAsync(
                        Callable<T> callable,
                        BiConsumer<T, Throwable> handler,
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
similarity index 87%
rename from 
flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
index aa26808..18d8433 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.source.event;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 /**
  * A source event sent from the SplitEnumerator to the SourceReader to 
indicate that no more
  * splits will be assigned to the source reader anymore. So once the 
SplitReader finishes
  * reading the currently assigned splits, they can exit.
  */
-public class NoMoreSplitsEvent implements SourceEvent {
+public class NoMoreSplitsEvent implements OperatorEvent {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index 5ffb930..80043cf 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -36,6 +36,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.operators.coordination.OperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -277,6 +278,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit>
                        }
                } else if (event instanceof SourceEventWrapper) {
                        sourceReader.handleSourceEvents(((SourceEventWrapper) 
event).getSourceEvent());
+               } else if (event instanceof NoMoreSplitsEvent) {
+                       sourceReader.notifyNoMoreSplits();
                } else {
                        throw new IllegalStateException("Received unexpected 
operator event " + event);
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
index d60934c..925c498 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.Watermark;
 import org.apache.flink.api.common.eventtime.WatermarkStrategy;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.api.connector.source.ReaderOutput;
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.core.fs.CloseableRegistry;
@@ -293,7 +292,7 @@ public class SourceOperatorEventTimeTest {
                public void addSplits(List<MockSourceSplit> splits) {}
 
                @Override
-               public void handleSourceEvents(SourceEvent sourceEvent) {}
+               public void notifyNoMoreSplits() {}
 
                @Override
                public void close() {}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
index ecaeb89..f3b963a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTaskTest.java
@@ -29,7 +29,6 @@ import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
 import org.apache.flink.api.connector.source.mocks.MockSource;
 import org.apache.flink.api.connector.source.mocks.MockSourceReader;
-import 
org.apache.flink.api.connector.source.mocks.MockSourceReader.MockNoMoreSplitsEvent;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplitSerializer;
 import org.apache.flink.metrics.Counter;
@@ -48,7 +47,7 @@ import 
org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
 import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
-import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractInput;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -98,6 +97,7 @@ import static org.junit.Assert.assertTrue;
  * Tests for {@link MultipleInputStreamTask}. Theses tests implicitly also 
test the
  * {@link StreamMultipleInputProcessor}.
  */
+@SuppressWarnings("serial")
 public class MultipleInputStreamTaskTest {
        private static final List<String> LIFE_CYCLE_EVENTS = new ArrayList<>();
 
@@ -869,7 +869,7 @@ public class MultipleInputStreamTaskTest {
        private void finishAddingRecords(StreamTaskMailboxTestHarness<String> 
testHarness, int sourceId) throws Exception {
                testHarness.getStreamTask().dispatchOperatorEvent(
                        getSourceOperatorID(testHarness, sourceId),
-                       new SerializedValue<>(new SourceEventWrapper(new 
MockNoMoreSplitsEvent())));
+                       new SerializedValue<>(new NoMoreSplitsEvent()));
        }
 
        static class LifeCycleTrackingMapToStringMultipleInputOperator
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 8741478..f2a9fa9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -328,8 +328,7 @@ public class UnalignedCheckpointITCase extends TestLogger {
                        }
 
                        @Override
-                       public void handleSourceEvents(SourceEvent sourceEvent) 
{
-                       }
+                       public void notifyNoMoreSplits() {}
 
                        @Override
                        public void close() throws Exception {

Reply via email to