[hotfix] Abort restore when the procedure failed through with a closed 
CloseableRegistry

This prevents that exceptions from cancellation through the CloseableRegistry 
will result in
unnecessary recovery attemps with alternative state.

(cherry picked from commit a7b54f1)


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f3d7733a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f3d7733a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f3d7733a

Branch: refs/heads/release-1.5
Commit: f3d7733aa30cb060ee7a091de5a8d6738a61f78b
Parents: f618c7e
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Mar 12 22:43:47 2018 +0100
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Thu May 17 10:07:35 2018 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/operators/BackendRestorerProcedure.java  | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f3d7733a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
index dd75fb2..0f5b0e0 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
@@ -125,6 +125,10 @@ public class BackendRestorerProcedure<
 
                                LOG.warn("Exception while restoring {} from 
alternative ({}/{}), will retry while more " +
                                        "alternatives are available.", 
logDescription, alternativeIdx, restoreOptions.size(), ex);
+
+                               if (backendCloseableRegistry.isClosed()) {
+                                       throw new FlinkException("Stopping 
restore attempts for already cancelled task.", collectedException);
+                               }
                        }
                }
 

Reply via email to