Repository: flink Updated Branches: refs/heads/release-1.5 69ff5a744 -> 11c30c8cc
[hotfix] [flip6] Harden JobMaster#triggerSavepoint Check first whether the CheckpointCoordinator has been set before triggering a savepoint. If it has not been set, then return a failure message. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bce18dde Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bce18dde Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bce18dde Branch: refs/heads/release-1.5 Commit: bce18dded573d93bf7b1ea4b6633af87846ad493 Parents: 2532b11 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Mar 1 20:09:55 2018 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sat Mar 3 00:07:51 2018 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/jobmaster/JobMaster.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bce18dde/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index bfe8aaa..cc4cdba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -971,14 +971,20 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast @Override public CompletableFuture<String> triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() + @Nullable final String targetDirectory, + final Time timeout) { + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + + if (checkpointCoordinator != null) { + return checkpointCoordinator .triggerSavepoint(System.currentTimeMillis(), targetDirectory) .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + } else { + return FutureUtils.completedExceptionally( + new FlinkException( + String.format( + "Cannot trigger a savepoint because the job %s is not a streaming job.", + jobGraph.getJobID()))); } }