[hotfix] Clear interrupted flag in stream task cancellation We clear the interrupted flag before the cleanup code block of task cancellation. Otherwise, code that would like to wait until services are properly shutdown will always immediately return from calls that are supposed to be blocking waits.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f9a583b7 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f9a583b7 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f9a583b7 Branch: refs/heads/master Commit: f9a583b727c9aecbec3213b12266f1d598223400 Parents: a8fc3b1 Author: Stefan Richter <[email protected]> Authored: Fri Feb 23 18:21:24 2018 +0100 Committer: Stefan Richter <[email protected]> Committed: Sun Feb 25 15:59:55 2018 +0100 ---------------------------------------------------------------------- .../java/org/apache/flink/streaming/runtime/tasks/StreamTask.java | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f9a583b7/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7ebbc71..55d0132 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -348,6 +348,9 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // clean up everything we initialized isRunning = false; + // clear the interrupted status so that we can wait for the following resource shutdowns to complete + Thread.interrupted(); + // stop all timers and threads if (timerService != null && !timerService.isTerminated()) { try {
