This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8715f348c4ddf1fde243d8f89847b413027d350
Author: Stephan Ewen <[email protected]>
AuthorDate: Mon Nov 9 00:27:34 2020 +0100

    [FLINK-20049][core] Add built-in method to request split in source API.
    
    This replaces the custom event handling done by many source implementations.
---
 .../source/reader/mocks/MockSplitEnumerator.java   | 11 +++++-----
 .../api/connector/source/SourceReaderContext.java  |  7 +++++++
 .../api/connector/source/SplitEnumerator.java      | 24 +++++++++++++++++++---
 .../source/lib/util/IteratorSourceEnumerator.java  | 11 +++-------
 .../source/lib/util/IteratorSourceReader.java      |  3 +--
 .../source/mocks/MockSplitEnumerator.java          |  5 +++++
 .../source/mocks/TestingReaderContext.java         |  3 +++
 .../source/coordinator/SourceCoordinator.java      |  8 +++++++-
 .../runtime}/source/event/RequestSplitEvent.java   |  6 +++---
 .../streaming/api/operators/SourceOperator.java    |  6 ++++++
 10 files changed, 62 insertions(+), 22 deletions(-)

diff --git 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
index 77da1ba..134a21f 100644
--- 
a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
+++ 
b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/mocks/MockSplitEnumerator.java
@@ -24,6 +24,8 @@ import 
org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 import org.apache.flink.api.connector.source.mocks.MockSourceSplit;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -45,14 +47,13 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Lis
        }
 
        @Override
-       public void start() {
-
-       }
+       public void start() {}
 
        @Override
-       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {}
 
-       }
+       @Override
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {}
 
        @Override
        public void addSplitsBack(List<MockSourceSplit> splits, int subtaskId) {
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
index 4d9e53a..f1ea255 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceReaderContext.java
@@ -50,6 +50,13 @@ public interface SourceReaderContext {
        int getIndexOfSubtask();
 
        /**
+        * Sends a split request to the source's {@link SplitEnumerator}.
+        * This will result in a call to the {@link 
SplitEnumerator#handleSplitRequest(int, String)} method,
+        * with this reader's parallel subtask id and the hostname where this 
reader runs.
+        */
+       void sendSplitRequest();
+
+       /**
         * Send a source event to the source coordinator.
         *
         * @param sourceEvent the source event to coordinator.
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
index e97f5e1..ab6ce44 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java
@@ -20,6 +20,8 @@ package org.apache.flink.api.connector.source;
 
 import org.apache.flink.annotation.PublicEvolving;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.List;
 
@@ -40,12 +42,14 @@ public interface SplitEnumerator<SplitT extends 
SourceSplit, CheckpointT>
        void start();
 
        /**
-        * Handles the source event from the source reader.
+        * Handles the request for a split. This method is called when the 
reader with the given subtask
+        * id calls the {@link SourceReaderContext#sendSplitRequest()} method.
         *
         * @param subtaskId the subtask id of the source reader who sent the 
source event.
-        * @param sourceEvent the source event from the source reader.
+        * @param requesterHostname Optional, the hostname where the requesting 
task is running.
+        *                          This can be used to make split assignments 
locality-aware.
         */
-       void handleSourceEvent(int subtaskId, SourceEvent sourceEvent);
+       void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname);
 
        /**
         * Add a split back to the split enumerator. It will only happen when a 
{@link SourceReader} fails
@@ -91,4 +95,18 @@ public interface SplitEnumerator<SplitT extends SourceSplit, 
CheckpointT>
         * In 1.12, this default method is inherited from the 
CheckpointListener interface.
         */
        default void notifyCheckpointAborted(long checkpointId) {}
+
+       /**
+        * Handles a custom source event from the source reader.
+        *
+        * <p>This method has a default implementation that does nothing, 
because it is only
+        * required to be implemented by some sources, which have a custom 
event protocol between
+        * reader and enumerator. The common events for reader registration and 
split requests
+        * are not dispatched to this method, but rather invoke the {@link 
#addReader(int)} and
+        * {@link #handleSplitRequest(int, String)} methods.
+        *
+        * @param subtaskId the subtask id of the source reader who sent the 
source event.
+        * @param sourceEvent the source event from the source reader.
+        */
+       default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) 
{}
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
index 61e806d..0a4efe3 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceEnumerator.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.api.connector.source.lib.util;
 
-import org.apache.flink.api.connector.source.SourceEvent;
 import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
-import org.apache.flink.util.FlinkRuntimeException;
+
+import javax.annotation.Nullable;
 
 import java.util.ArrayDeque;
 import java.util.Collection;
@@ -56,11 +55,7 @@ public class IteratorSourceEnumerator<SplitT extends 
IteratorSourceSplit<?, ?>>
        public void close() {}
 
        @Override
-       public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
-               if (!(sourceEvent instanceof RequestSplitEvent)) {
-                       throw new FlinkRuntimeException("Unrecognized event: " 
+ sourceEvent);
-               }
-
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
                final SplitT nextSplit = remainingSplits.poll();
                if (nextSplit != null) {
                        context.assignSplit(nextSplit, subtaskId);
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
index 5c6631e..92ab054 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/lib/util/IteratorSourceReader.java
@@ -21,7 +21,6 @@ package org.apache.flink.api.connector.source.lib.util;
 import org.apache.flink.api.connector.source.ReaderOutput;
 import org.apache.flink.api.connector.source.SourceReader;
 import org.apache.flink.api.connector.source.SourceReaderContext;
-import org.apache.flink.api.connector.source.event.RequestSplitEvent;
 import org.apache.flink.core.io.InputStatus;
 
 import javax.annotation.Nullable;
@@ -83,7 +82,7 @@ public class IteratorSourceReader<E, IterT extends 
Iterator<E>, SplitT extends I
        public void start() {
                // request a split only if we did not get one during restore
                if (iterator == null) {
-                       context.sendSourceEventToCoordinator(new 
RequestSplitEvent());
+                       context.sendSplitRequest();
                }
        }
 
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
index 0525533..bc034b0 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/MockSplitEnumerator.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.connector.source.SplitEnumerator;
 import org.apache.flink.api.connector.source.SplitEnumeratorContext;
 import org.apache.flink.api.connector.source.SplitsAssignment;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -71,6 +73,9 @@ public class MockSplitEnumerator implements 
SplitEnumerator<MockSourceSplit, Set
        }
 
        @Override
+       public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {}
+
+       @Override
        public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
                handledSourceEvent.add(sourceEvent);
        }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
index 3f254e4..98aad54 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/connector/source/mocks/TestingReaderContext.java
@@ -69,6 +69,9 @@ public class TestingReaderContext implements 
SourceReaderContext {
        }
 
        @Override
+       public void sendSplitRequest() {}
+
+       @Override
        public void sendSourceEventToCoordinator(SourceEvent sourceEvent) {
                sentEvents.add(sourceEvent);
        }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
index 3e3af76..acbc93d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinator.java
@@ -30,7 +30,9 @@ import 
org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
+import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -134,10 +136,14 @@ public class SourceCoordinator<SplitT extends 
SourceSplit, EnumChkT> implements
                coordinatorExecutor.execute(() -> {
                        try {
                                LOG.debug("Handling event from subtask {} of 
source {}: {}", subtask, operatorName, event);
-                               if (event instanceof SourceEventWrapper) {
+                               if (event instanceof RequestSplitEvent) {
+                                       enumerator.handleSplitRequest(subtask, 
((RequestSplitEvent) event).hostName());
+                               } else if (event instanceof SourceEventWrapper) 
{
                                        enumerator.handleSourceEvent(subtask, 
((SourceEventWrapper) event).getSourceEvent());
                                } else if (event instanceof 
ReaderRegistrationEvent) {
                                        
handleReaderRegistrationEvent((ReaderRegistrationEvent) event);
+                               } else {
+                                       throw new FlinkException("Unrecognized 
Operator Event: " + event);
                                }
                        } catch (Exception e) {
                                LOG.error("Failing the job due to exception 
when handling operator event {} from subtask {} " +
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java
similarity index 92%
rename from 
flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
rename to 
flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java
index 9ff2293..6e7166b 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/event/RequestSplitEvent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/event/RequestSplitEvent.java
@@ -16,9 +16,9 @@
  * limitations under the License.
  */
 
-package org.apache.flink.api.connector.source.event;
+package org.apache.flink.runtime.source.event;
 
-import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.runtime.operators.coordination.OperatorEvent;
 
 import javax.annotation.Nullable;
 
@@ -30,7 +30,7 @@ import java.util.Objects;
  * <p>This event optionally carries the hostname of the location where the 
reader runs, to support
  * locality-aware work assignment.
  */
-public final class RequestSplitEvent implements SourceEvent {
+public final class RequestSplitEvent implements OperatorEvent {
 
        private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
index bfe51e2..8ac54ea 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperator.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.operators.coordination.OperatorEventHandler;
 import org.apache.flink.runtime.source.event.AddSplitEvent;
 import org.apache.flink.runtime.source.event.NoMoreSplitsEvent;
 import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
+import org.apache.flink.runtime.source.event.RequestSplitEvent;
 import org.apache.flink.runtime.source.event.SourceEventWrapper;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -157,6 +158,11 @@ public class SourceOperator<OUT, SplitT extends 
SourceSplit>
                        }
 
                        @Override
+                       public void sendSplitRequest() {
+                               operatorEventGateway.sendEventToCoordinator(new 
RequestSplitEvent(getLocalHostName()));
+                       }
+
+                       @Override
                        public void sendSourceEventToCoordinator(SourceEvent 
event) {
                                operatorEventGateway.sendEventToCoordinator(new 
SourceEventWrapper(event));
                        }

Reply via email to