This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8715f348c4ddf1fde243d8f89847b413027d350 Author: Stephan Ewen <[email protected]> AuthorDate: Mon Nov 9 00:27:34 2020 +0100 [FLINK-20049][core] Add built-in method to request split in source API. This replaces the custom event handling done by many source implementations. --- .../source/reader/mocks/MockSplitEnumerator.java | 11 +++++----- .../api/connector/source/SourceReaderContext.java | 7 +++++++ .../api/connector/source/SplitEnumerator.java | 24 +++++++++++++++++++--- .../source/lib/util/IteratorSourceEnumerator.java | 11 +++------- .../source/lib/util/IteratorSourceReader.java | 3 +-- .../source/mocks/MockSplitEnumerator.java | 5 +++++ .../source/mocks/TestingReaderContext.java | 3 +++ .../source/coordinator/SourceCoordinator.java | 8 +++++++- .../runtime}/source/event/RequestSplitEvent.java | 6 +++--- .../streaming/api/operators/SourceOperator.java | 6 ++++++ 10 files changed, 62 insertions(+), 22 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java index 77da1ba..134a21f 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java @@ -24,6 +24,8 @@ import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; import org.apache.flink.api.connector.source.mocks.MockSourceSplit; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -45,14 +47,13 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Lis } @Override - public void start() { - - } + public void start() {} @Override - public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { + public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {} - } + @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} @Override public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) { diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java index 4d9e53a..f1ea255 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java @@ -50,6 +50,13 @@ public interface SourceReaderContext { int getIndexOfSubtask(); /** + * Sends a split request to the source's {@link SplitEnumerator}. + * This will result in a call to the {@link SplitEnumerator#handleSplitRequest(int, String)} method, + * with this reader's parallel subtask id and the hostname where this reader runs. + */ + void sendSplitRequest(); + + /** * Send a source event to the source coordinator. * * @param sourceEvent the source event to coordinator. diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java index e97f5e1..ab6ce44 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java @@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source; import org.apache.flink.annotation.PublicEvolving; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.List; @@ -40,12 +42,14 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> void start(); /** - * Handles the source event from the source reader. + * Handles the request for a split. This method is called when the reader with the given subtask + * id calls the {@link SourceReaderContext#sendSplitRequest()} method. * * @param subtaskId the subtask id of the source reader who sent the source event. - * @param sourceEvent the source event from the source reader. + * @param requesterHostname Optional, the hostname where the requesting task is running. + * This can be used to make split assignments locality-aware. */ - void handleSourceEvent(int subtaskId, SourceEvent sourceEvent); + void handleSplitRequest(int subtaskId, @Nullable String requesterHostname); /** * Add a split back to the split enumerator. It will only happen when a {@link SourceReader} fails @@ -91,4 +95,18 @@ public interface SplitEnumerator<SplitT extends SourceSplit, CheckpointT> * In 1.12, this default method is inherited from the CheckpointListener interface. */ default void notifyCheckpointAborted(long checkpointId) {} + + /** + * Handles a custom source event from the source reader. + * + * <p>This method has a default implementation that does nothing, because it is only + * required to be implemented by some sources, which have a custom event protocol between + * reader and enumerator. The common events for reader registration and split requests + * are not dispatched to this method, but rather invoke the {@link #addReader(int)} and + * {@link #handleSplitRequest(int, String)} methods. + * + * @param subtaskId the subtask id of the source reader who sent the source event. + * @param sourceEvent the source event from the source reader. + */ + default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {} } diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java index 61e806d..0a4efe3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java @@ -18,11 +18,10 @@ package org.apache.flink.api.connector.source.lib.util; -import org.apache.flink.api.connector.source.SourceEvent; import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; -import org.apache.flink.api.connector.source.event.RequestSplitEvent; -import org.apache.flink.util.FlinkRuntimeException; + +import javax.annotation.Nullable; import java.util.ArrayDeque; import java.util.Collection; @@ -56,11 +55,7 @@ public class IteratorSourceEnumerator<SplitT extends IteratorSourceSplit<?, ?>> public void close() {} @Override - public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { - if (!(sourceEvent instanceof RequestSplitEvent)) { - throw new FlinkRuntimeException("Unrecognized event: " + sourceEvent); - } - + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { final SplitT nextSplit = remainingSplits.poll(); if (nextSplit != null) { context.assignSplit(nextSplit, subtaskId); diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java index 5c6631e..92ab054 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java @@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util; import org.apache.flink.api.connector.source.ReaderOutput; import org.apache.flink.api.connector.source.SourceReader; import org.apache.flink.api.connector.source.SourceReaderContext; -import org.apache.flink.api.connector.source.event.RequestSplitEvent; import org.apache.flink.core.io.InputStatus; import javax.annotation.Nullable; @@ -83,7 +82,7 @@ public class IteratorSourceReader<E, IterT extends Iterator<E>, SplitT extends I public void start() { // request a split only if we did not get one during restore if (iterator == null) { - context.sendSourceEventToCoordinator(new RequestSplitEvent()); + context.sendSplitRequest(); } } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java index 0525533..bc034b0 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java @@ -23,6 +23,8 @@ import org.apache.flink.api.connector.source.SplitEnumerator; import org.apache.flink.api.connector.source.SplitEnumeratorContext; import org.apache.flink.api.connector.source.SplitsAssignment; +import javax.annotation.Nullable; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -71,6 +73,9 @@ public class MockSplitEnumerator implements SplitEnumerator<MockSourceSplit, Set } @Override + public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {} + + @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { handledSourceEvent.add(sourceEvent); } diff --git a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java index 3f254e4..98aad54 100644 --- a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java +++ b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java @@ -69,6 +69,9 @@ public class TestingReaderContext implements SourceReaderContext { } @Override + public void sendSplitRequest() {} + + @Override public void sendSourceEventToCoordinator(SourceEvent sourceEvent) { sentEvents.add(sourceEvent); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java index 3e3af76..acbc93d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java @@ -30,7 +30,9 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.runtime.operators.coordination.OperatorEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; +import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; +import org.apache.flink.util.FlinkException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -134,10 +136,14 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT> implements coordinatorExecutor.execute(() -> { try { LOG.debug("Handling event from subtask {} of source {}: {}", subtask, operatorName, event); - if (event instanceof SourceEventWrapper) { + if (event instanceof RequestSplitEvent) { + enumerator.handleSplitRequest(subtask, ((RequestSplitEvent) event).hostName()); + } else if (event instanceof SourceEventWrapper) { enumerator.handleSourceEvent(subtask, ((SourceEventWrapper) event).getSourceEvent()); } else if (event instanceof ReaderRegistrationEvent) { handleReaderRegistrationEvent((ReaderRegistrationEvent) event); + } else { + throw new FlinkException("Unrecognized Operator Event: " + event); } } catch (Exception e) { LOG.error("Failing the job due to exception when handling operator event {} from subtask {} " + diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java similarity index 92% rename from flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java index 9ff2293..6e7166b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java @@ -16,9 +16,9 @@ * limitations under the License. */ -package org.apache.flink.api.connector.source.event; +package org.apache.flink.runtime.source.event; -import org.apache.flink.api.connector.source.SourceEvent; +import org.apache.flink.runtime.operators.coordination.OperatorEvent; import javax.annotation.Nullable; @@ -30,7 +30,7 @@ import java.util.Objects; * <p>This event optionally carries the hostname of the location where the reader runs, to support * locality-aware work assignment. */ -public final class RequestSplitEvent implements SourceEvent { +public final class RequestSplitEvent implements OperatorEvent { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java index bfe51e2..8ac54ea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.operators.coordination.OperatorEventHandler; import org.apache.flink.runtime.source.event.AddSplitEvent; import org.apache.flink.runtime.source.event.NoMoreSplitsEvent; import org.apache.flink.runtime.source.event.ReaderRegistrationEvent; +import org.apache.flink.runtime.source.event.RequestSplitEvent; import org.apache.flink.runtime.source.event.SourceEventWrapper; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; @@ -157,6 +158,11 @@ public class SourceOperator<OUT, SplitT extends SourceSplit> } @Override + public void sendSplitRequest() { + operatorEventGateway.sendEventToCoordinator(new RequestSplitEvent(getLocalHostName())); + } + + @Override public void sendSourceEventToCoordinator(SourceEvent event) { operatorEventGateway.sendEventToCoordinator(new SourceEventWrapper(event)); }
