Github user GJL commented on a diff in the pull request: https://github.com/apache/flink/pull/5622#discussion_r171857387 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java --- @@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final ResourceID resourceID) { @Override public CompletableFuture<String> triggerSavepoint( - @Nullable final String targetDirectory, - final Time timeout) { - try { - return executionGraph.getCheckpointCoordinator() - .triggerSavepoint(System.currentTimeMillis(), targetDirectory) - .thenApply(CompletedCheckpoint::getExternalPointer); - } catch (Exception e) { - return FutureUtils.completedExceptionally(e); + @Nullable final String targetDirectory, + final boolean cancelJob, + final Time timeout) { + + final CheckpointCoordinator checkpointCoordinator = executionGraph.getCheckpointCoordinator(); + if (checkpointCoordinator == null) { + return FutureUtils.completedExceptionally(new IllegalStateException( + String.format("Job %s is not a streaming job.", jobGraph.getJobID()))); --- End diff -- If the job is in a terminal state, the coordinator will be `null`ed as well.
---