This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b0cacf9719320a07cb36e1bafeea9c9ae3409cd Author: Stephan Ewen <[email protected]> AuthorDate: Wed Nov 4 22:22:30 2020 +0100 [FLINK-19265][core] Add to source coordinator built-in methods to signal "no more splits". This replaces custom events and event handling implemented by many sources. --- .../base/source/reader/SourceReaderBase.java | 17 ++++++++-------- .../base/source/reader/SourceReaderBaseTest.java | 9 ++++----- .../source/reader/mocks/MockSplitEnumerator.java | 3 +-- .../flink/api/connector/source/SourceReader.java | 14 ++++++++++++- .../connector/source/SplitEnumeratorContext.java | 8 ++++++++ .../source/lib/util/IteratorSourceEnumerator.java | 3 +-- .../source/lib/util/IteratorSourceReader.java | 23 ++++++++++------------ .../connector/source/mocks/MockSourceReader.java | 20 ++++++++----------- .../coordinator/SourceCoordinatorContext.java | 14 +++++++++++++ .../runtime}/source/event/NoMoreSplitsEvent.java | 6 +++--- .../streaming/api/operators/SourceOperator.java | 3 +++ .../source/SourceOperatorEventTimeTest.java | 3 +-- 12 files changed, 74 insertions(+), 49 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java index 503e6c9..430f629 100644 --- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java +++ b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/source/reader/SourceReaderBase.java @@ -24,7 +24,6 @@ import org.apache.flink.api.connector.source.SourceOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager; import org.apache.flink.connector.base.source.reader.splitreader.SplitReader; @@ -220,13 +219,15 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt } @Override + public void notifyNoMoreSplits() { + LOG.info("Reader received NoMoreSplits event."); + noMoreSplitsAssignment = true; + elementsQueue.notifyAvailable(); + } + + @Override public void handleSourceEvents(SourceEvent sourceEvent) { - LOG.trace("Handling source event: {}", sourceEvent); - if (sourceEvent instanceof NoMoreSplitsEvent) { - LOG.info("Reader received NoMoreSplits event."); - noMoreSplitsAssignment = true; - elementsQueue.notifyAvailable(); - } + LOG.info("Received unhandled source event: {}", sourceEvent); } @Override @@ -235,8 +236,6 @@ public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitSt splitFetcherManager.close(options.sourceReaderCloseTimeout); } - - // -------------------- Abstract method to allow different implementations ------------------ /** * Handles the finished splits to clean the state if needed. diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java index 3efe222..764b203 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/SourceReaderBaseTest.java @@ -21,7 +21,6 @@ package org.apache.flink.connector.base.source.reader; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceSplit; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.api.connector.source.mocks.TestingReaderContext; import org.apache.flink.api.connector.source.mocks.TestingReaderOutput; @@ -95,7 +94,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> reader.addSplits(Collections.singletonList(getSplit(0, NUM_RECORDS_PER_SPLIT, Boundedness.CONTINUOUS_UNBOUNDED))); - reader.handleSourceEvents(new NoMoreSplitsEvent()); + reader.notifyNoMoreSplits(); // This is not a real infinite loop, it is supposed to throw exception after two polls. while (true) { InputStatus inputStatus = reader.pollNext(output); @@ -151,7 +150,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> getSplit(1, 12, Boundedness.BOUNDED) ); reader.addSplits(splits); - reader.handleSourceEvents(new NoMoreSplitsEvent()); + reader.notifyNoMoreSplits(); while (true) { InputStatus status = reader.pollNext(new TestingReaderOutput<>()); @@ -185,7 +184,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> getSplit(1, 10, Boundedness.BOUNDED) ); reader.addSplits(splits); - reader.handleSourceEvents(new NoMoreSplitsEvent()); + reader.notifyNoMoreSplits(); while (true) { InputStatus status = reader.pollNext(new TestingReaderOutput<>()); @@ -216,7 +215,7 @@ public class SourceReaderBaseTest extends SourceReaderTestBase<MockSourceSplit> // Create and add a split that only contains one record final MockSourceSplit split = new MockSourceSplit(0, 0, 1); sourceReader.addSplits(Collections.singletonList(split)); - sourceReader.handleSourceEvents(new NoMoreSplitsEvent()); + sourceReader.notifyNoMoreSplits(); // Add the last record to the split when the splitFetcherManager shutting down SplitFetchers splitFetcherManager.getInShutdownSplitFetcherFuture().thenRun(() -> split.addRecord(1)); diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java index 8df3394..77da1ba 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java @@ -22,7 +22,6 @@ import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import java.io.IOException; @@ -73,7 +72,7 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Lis context.assignSplits(new SplitsAssignment<>(assignment)); splits.clear(); for (int i = 0; i < numReaders; i++) { - context.sendEventToSourceReader(i, new NoMoreSplitsEvent()); + context.signalNoMoreSplits(i); } } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java index 0aac1b1..6dad2d5 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReader.java @@ -74,11 +74,23 @@ public interface SourceReader<T, SplitT extends SourceSplit> void addSplits(List<SplitT> splits); /** + * This method is called when the reader is notified that it will not + * receive any further splits. + * + * <p>It is triggered when the enumerator calls {@link SplitEnumeratorContext#signalNoMoreSplits(int)} + * with the reader's parallel subtask. + */ + void notifyNoMoreSplits(); + + /** * Handle a source event sent by the {@link SplitEnumerator}. * + * <p>This method has a default implementation that does nothing, because + * most sources do not require any custom events. + * * @param sourceEvent the event sent by the {@link SplitEnumerator}. */ - void handleSourceEvents(SourceEvent sourceEvent); + default void handleSourceEvents(SourceEvent sourceEvent) {} /** * We have an empty default implementation here because most source readers do not have diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java index dbf8308..9ce47ef 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java @@ -84,6 +84,14 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { } /** + * Signals a subtask that it will not receive any further split. + * + * @param subtask The index of the operator's parallel subtask that shall be + * signaled it will not receive any further split. + */ + void signalNoMoreSplits(int subtask); + + /** * Invoke the callable and handover the return value to the handler which will be executed * by the source coordinator. When this method is invoked multiple times, The <code>Coallble</code>s * may be executed in a thread pool concurrently. diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java index 91bc283..61e806d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java @@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.util.FlinkRuntimeException; @@ -66,7 +65,7 @@ public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> if (nextSplit != null) { context.assignSplit(nextSplit, subtaskId); } else { - context.sendEventToSourceReader(subtaskId, new NoMoreSplitsEvent()); + context.signalNoMoreSplits(subtaskId); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index 3af75e1..5c6631e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -19,13 +19,10 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.api.connector.source.event.NoMoreSplitsEvent; import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.core.io.InputStatus; -import org.apache.flink.util.FlinkRuntimeException; import javax.annotation.Nullable; @@ -122,6 +119,16 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I } @Override + public void notifyNoMoreSplits() { + // if we get this after we already had a split, we must have requested more than + // one split, which is not expected here. + checkState(remainingSplits == null, "Unexpected response, requested more than one split."); + + // non-null queue signals splits were assigned, in this case no splits + remainingSplits = new ArrayDeque<>(); + } + + @Override public List<SplitT> snapshotState(long checkpointId) { final ArrayList<SplitT> allSplits = new ArrayList<>(1 + remainingSplits.size()); if (iterator != null) { @@ -134,15 +141,5 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I } @Override - public void handleSourceEvents(SourceEvent sourceEvent) { - if (sourceEvent instanceof NoMoreSplitsEvent) { - // non-null queue signals splits were assigned, in this case no splits - remainingSplits = new ArrayDeque<>(); - } else { - throw new FlinkRuntimeException("Unexpected event: " + sourceEvent); - } - } - - @Override public void close() throws Exception {} } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java index b0a24dd..8eb4596 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSourceReader.java @@ -108,12 +108,9 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } @Override - public void handleSourceEvents(SourceEvent sourceEvent) { - if (sourceEvent instanceof MockNoMoreSplitsEvent) { - waitingForMoreSplits = false; - markAvailable(); - } - receivedSourceEvents.add(sourceEvent); + public void notifyNoMoreSplits() { + waitingForMoreSplits = false; + markAvailable(); } @Override @@ -137,6 +134,11 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> } } + @Override + public void handleSourceEvents(SourceEvent sourceEvent) { + receivedSourceEvents.add(sourceEvent); + } + // --------------- methods for unit tests --------------- public void markAvailable() { @@ -174,10 +176,4 @@ public class MockSourceReader implements SourceReader<Integer, MockSourceSplit> public List<Long> getAbortedCheckpoints() { return abortedCheckpoints; } - - /** - * Simple event allowing {@link MockSourceReader} to finish when requested. - */ - public static class MockNoMoreSplitsEvent implements SourceEvent { - } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index 062e2d4..6141688 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.TaskNotRunningException; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.Preconditions; @@ -184,6 +185,19 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> } @Override + public void signalNoMoreSplits(int subtask) { + // Ensure the split assignment is done by the the coordinator executor. + callInCoordinatorThread(() -> { + try { + operatorCoordinatorContext.sendEvent(new NoMoreSplitsEvent(), subtask); + return null; // void return value + } catch (TaskNotRunningException e) { + throw new FlinkRuntimeException("Failed to send 'NoMoreSplits' to reader " + subtask, e); + } + }, "Failed to send 'NoMoreSplits' to reader " + subtask); + } + + @Override public <T> void callAsync( Callable<T> callable, BiConsumer<T, Throwable> handler, diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java similarity index 87% rename from flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java index aa26808..18d8433 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/NoMoreSplitsEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/NoMoreSplitsEvent.java @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.flink.api.connector.source.event; +package org.apache.flink.runtime.source.event; -import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; /** * A source event sent from the SplitEnumerator to the SourceReader to indicate that no more * splits will be assigned to the source reader anymore. So once the SplitReader finishes * reading the currently assigned splits, they can exit. */ -public class NoMoreSplitsEvent implements SourceEvent { +public class NoMoreSplitsEvent implements OperatorEvent { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index 799b2a1..bfe51e2 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.operators.coordination.OperatorEventGateway; import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.runtime.source.event.AddSplitEvent; +import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.runtime.state.StateInitializationContext; @@ -264,6 +265,8 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } } else if (event instanceof SourceEventWrapper) { sourceReader.handleSourceEvents(((SourceEventWrapper) event).getSourceEvent()); + } else if (event instanceof NoMoreSplitsEvent) { + sourceReader.notifyNoMoreSplits(); } else { throw new IllegalStateException("Received unexpected operator event " + event); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java index d48ed13..d47d309 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/source/SourceOperatorEventTimeTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.eventtime.Watermark; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.connector.source.ReaderOutput; -import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; import org.apache.flink.core.fs.CloseableRegistry; @@ -252,7 +251,7 @@ public class SourceOperatorEventTimeTest { public void addSplits(List<MockSourceSplit> splits) {} @Override - public void handleSourceEvents(SourceEvent sourceEvent) {} + public void notifyNoMoreSplits() {} @Override public void close() {}
