This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 57efc6980c4a2792e73b270a8dd39d95c9ed6de6
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Aug 24 16:06:04 2021 +0200

    [FLINK-23527][core] Clarify semantics of SourceFunction.cancel() with 
respect to thread interruptions.
---
 .../api/functions/source/SourceFunction.java       | 26 +++++++++++++++++-----
 1 file changed, 21 insertions(+), 5 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
index 766a3ba..03ce071 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java
@@ -123,11 +123,27 @@ public interface SourceFunction<T> extends Function, 
Serializable {
      * <p>A typical pattern is to have an {@code "volatile boolean isRunning"} 
flag that is set to
      * {@code false} in this method. That flag is checked in the loop 
condition.
      *
-     * <p>When a source is canceled, the executing thread will also be 
interrupted (via {@link
-     * Thread#interrupt()}). The interruption happens strictly after this 
method has been called, so
-     * any interruption handler can rely on the fact that this method has 
completed. It is good
-     * practice to make any flags altered by this method "volatile", in order 
to guarantee the
-     * visibility of the effects of this method to any interruption handler.
+     * <p>In case of an ungraceful shutdown (cancellation of the source 
operator, possibly for
+     * failover), the thread that calls {@link #run(SourceContext)} will also 
be {@link
+     * Thread#interrupt() interrupted}) by the Flink runtime, in order to 
speed up the cancellation
+     * (to ensure threads exit blocking methods fast, like I/O, blocking 
queues, etc.). The
+     * interruption happens strictly after this method has been called, so any 
interruption handler
+     * can rely on the fact that this method has completed (for example to 
ignore exceptions that
+     * happen after cancellation).
+     *
+     * <p>During graceful shutdown (for example stopping a job with a 
savepoint), the program must
+     * cleanly exit the {@link #run(SourceContext)} method soon after this 
method was called. The
+     * Flink runtime will NOT interrupt the source thread during graceful 
shutdown. Source
+     * implementors must ensure that no thread interruption happens on any 
thread that emits records
+     * through the {@code SourceContext} from the {@link #run(SourceContext)} 
method; otherwise the
+     * clean shutdown may fail when threads are interrupted while processing 
the final records.
+     *
+     * <p>Because the {@code SourceFunction} cannot easily differentiate 
whether the shutdown should
+     * be graceful or ungraceful, we recommend that implementors refrain from 
interrupting any
+     * threads that interact with the {@code SourceContext} at all. You can 
rely on the Flink
+     * runtime to interrupt the source thread in case of ungraceful 
cancellation. Any additionally
+     * spawned threads that directly emit records through the {@code 
SourceContext} should use a
+     * shutdown method that does not rely on thread interruption.
      */
     void cancel();
 

Reply via email to