Repository: flink
Updated Branches:
  refs/heads/release-1.5 69ff5a744 -> 11c30c8cc


[hotfix] [flip6] Harden JobMaster#triggerSavepoint

Check first whether the CheckpointCoordinator has been set before triggering
a savepoint. If it has not been set, then return a failure message.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bce18dde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bce18dde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bce18dde

Branch: refs/heads/release-1.5
Commit: bce18dded573d93bf7b1ea4b6633af87846ad493
Parents: 2532b11
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Mar 1 20:09:55 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Mar 3 00:07:51 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/jobmaster/JobMaster.java | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bce18dde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index bfe8aaa..cc4cdba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -971,14 +971,20 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        @Override
        public CompletableFuture<String> triggerSavepoint(
-               @Nullable final String targetDirectory,
-               final Time timeout) {
-               try {
-                       return executionGraph.getCheckpointCoordinator()
+                       @Nullable final String targetDirectory,
+                       final Time timeout) {
+               final CheckpointCoordinator checkpointCoordinator = 
executionGraph.getCheckpointCoordinator();
+
+               if (checkpointCoordinator != null) {
+                       return checkpointCoordinator
                                .triggerSavepoint(System.currentTimeMillis(), 
targetDirectory)
                                
.thenApply(CompletedCheckpoint::getExternalPointer);
-               } catch (Exception e) {
-                       return FutureUtils.completedExceptionally(e);
+               } else {
+                       return FutureUtils.completedExceptionally(
+                               new FlinkException(
+                                       String.format(
+                                               "Cannot trigger a savepoint 
because the job %s is not a streaming job.",
+                                               jobGraph.getJobID())));
                }
        }
 

Reply via email to