This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2da7d1b7db0494a383b87d4cb563135aeea06a62
Author: Steven Wu <[email protected]>
AuthorDate: Thu Nov 5 19:53:39 2020 -0800

    [FLINK-19934][Connector] Add 
SplitEnumeratorContext.runInCoordinatorThread(Runnable)
    
    This closes #13955
---
 .../api/connector/source/SplitEnumeratorContext.java     | 16 ++++++++++++++++
 .../source/coordinator/SourceCoordinatorContext.java     |  5 +++++
 2 files changed, 21 insertions(+)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
index c85700c..dbf8308 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java
@@ -116,4 +116,20 @@ public interface SplitEnumeratorContext<SplitT extends 
SourceSplit> {
         * @param period the period between two invocations of the callable.
         */
        <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> 
handler, long initialDelay, long period);
+
+       /**
+        * Invoke the given runnable in the source coordinator thread.
+        *
+        * <p>This can be useful when the enumerator needs to execute some 
action (like assignSplits) triggered
+        * by some external events. E.g., Watermark from another source 
advanced and this source now be able to
+        * assign splits to awaiting readers. The trigger can be initiated from 
the coordinator thread of
+        * the other source. Instead of using lock for thread safety, this API 
allows to run such externally
+        * triggered action in the coordinator thread. Hence, we can ensure all 
enumerator actions are serialized
+        * in the single coordinator thread.
+        *
+        * <p>It is important that the runnable does not block.
+        *
+        * @param runnable a runnable to execute
+        */
+       void runInCoordinatorThread(Runnable runnable);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
index c3b4666..062e2d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java
@@ -198,6 +198,11 @@ public class SourceCoordinatorContext<SplitT extends 
SourceSplit>
        }
 
        @Override
+       public void runInCoordinatorThread(Runnable runnable) {
+               coordinatorExecutor.execute(runnable);
+       }
+
+       @Override
        public void close() throws InterruptedException {
                notifier.close();
                coordinatorExecutor.shutdown();

Reply via email to