This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 57efc6980c4a2792e73b270a8dd39d95c9ed6de6 Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Aug 24 16:06:04 2021 +0200 [FLINK-23527][core] Clarify semantics of SourceFunction.cancel() with respect to thread interruptions. --- .../api/functions/source/SourceFunction.java | 26 +++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java index 766a3ba..03ce071 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java @@ -123,11 +123,27 @@ public interface SourceFunction<T> extends Function, Serializable { * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} flag that is set to * {@code false} in this method. That flag is checked in the loop condition. * - * <p>When a source is canceled, the executing thread will also be interrupted (via {@link - * Thread#interrupt()}). The interruption happens strictly after this method has been called, so - * any interruption handler can rely on the fact that this method has completed. It is good - * practice to make any flags altered by this method "volatile", in order to guarantee the - * visibility of the effects of this method to any interruption handler. + * <p>In case of an ungraceful shutdown (cancellation of the source operator, possibly for + * failover), the thread that calls {@link #run(SourceContext)} will also be {@link + * Thread#interrupt() interrupted}) by the Flink runtime, in order to speed up the cancellation + * (to ensure threads exit blocking methods fast, like I/O, blocking queues, etc.). The + * interruption happens strictly after this method has been called, so any interruption handler + * can rely on the fact that this method has completed (for example to ignore exceptions that + * happen after cancellation). + * + * <p>During graceful shutdown (for example stopping a job with a savepoint), the program must + * cleanly exit the {@link #run(SourceContext)} method soon after this method was called. The + * Flink runtime will NOT interrupt the source thread during graceful shutdown. Source + * implementors must ensure that no thread interruption happens on any thread that emits records + * through the {@code SourceContext} from the {@link #run(SourceContext)} method; otherwise the + * clean shutdown may fail when threads are interrupted while processing the final records. + * + * <p>Because the {@code SourceFunction} cannot easily differentiate whether the shutdown should + * be graceful or ungraceful, we recommend that implementors refrain from interrupting any + * threads that interact with the {@code SourceContext} at all. You can rely on the Flink + * runtime to interrupt the source thread in case of ungraceful cancellation. Any additionally + * spawned threads that directly emit records through the {@code SourceContext} should use a + * shutdown method that does not rely on thread interruption. */ void cancel();