This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2da7d1b7db0494a383b87d4cb563135aeea06a62 Author: Steven Wu <[email protected]> AuthorDate: Thu Nov 5 19:53:39 2020 -0800 [FLINK-19934][Connector] Add SplitEnumeratorContext.runInCoordinatorThread(Runnable) This closes #13955 --- .../api/connector/source/SplitEnumeratorContext.java | 16 ++++++++++++++++ .../source/coordinator/SourceCoordinatorContext.java | 5 +++++ 2 files changed, 21 insertions(+) diff --git a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java index c85700c..dbf8308 100644 --- a/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java +++ b/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumeratorContext.java @@ -116,4 +116,20 @@ public interface SplitEnumeratorContext<SplitT extends SourceSplit> { * @param period the period between two invocations of the callable. */ <T> void callAsync(Callable<T> callable, BiConsumer<T, Throwable> handler, long initialDelay, long period); + + /** + * Invoke the given runnable in the source coordinator thread. + * + * <p>This can be useful when the enumerator needs to execute some action (like assignSplits) triggered + * by some external events. E.g., Watermark from another source advanced and this source now be able to + * assign splits to awaiting readers. The trigger can be initiated from the coordinator thread of + * the other source. Instead of using lock for thread safety, this API allows to run such externally + * triggered action in the coordinator thread. Hence, we can ensure all enumerator actions are serialized + * in the single coordinator thread. + * + * <p>It is important that the runnable does not block. + * + * @param runnable a runnable to execute + */ + void runInCoordinatorThread(Runnable runnable); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java index c3b4666..062e2d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/source/coordinator/SourceCoordinatorContext.java @@ -198,6 +198,11 @@ public class SourceCoordinatorContext<SplitT extends SourceSplit> } @Override + public void runInCoordinatorThread(Runnable runnable) { + coordinatorExecutor.execute(runnable); + } + + @Override public void close() throws InterruptedException { notifier.close(); coordinatorExecutor.shutdown();
