[hotfix] Clear interrupted flag in stream task cancellation

We clear the interrupted flag before the cleanup code block of task 
cancellation.
Otherwise, code that would like to wait until services are properly shutdown 
will
always immediately return from calls that are supposed to be blocking waits.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9a583b7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9a583b7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9a583b7

Branch: refs/heads/master
Commit: f9a583b727c9aecbec3213b12266f1d598223400
Parents: a8fc3b1
Author: Stefan Richter <[email protected]>
Authored: Fri Feb 23 18:21:24 2018 +0100
Committer: Stefan Richter <[email protected]>
Committed: Sun Feb 25 15:59:55 2018 +0100

----------------------------------------------------------------------
 .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f9a583b7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 7ebbc71..55d0132 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -348,6 +348,9 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
                        // clean up everything we initialized
                        isRunning = false;
 
+                       // clear the interrupted status so that we can wait for 
the following resource shutdowns to complete
+                       Thread.interrupted();
+
                        // stop all timers and threads
                        if (timerService != null && 
!timerService.isTerminated()) {
                                try {

Reply via email to