Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5622#discussion_r171857387
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final
ResourceID resourceID) {
@Override
public CompletableFuture<String> triggerSavepoint(
- @Nullable final String targetDirectory,
- final Time timeout) {
- try {
- return executionGraph.getCheckpointCoordinator()
- .triggerSavepoint(System.currentTimeMillis(),
targetDirectory)
-
.thenApply(CompletedCheckpoint::getExternalPointer);
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(e);
+ @Nullable final String targetDirectory,
+ final boolean cancelJob,
+ final Time timeout) {
+
+ final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator == null) {
+ return FutureUtils.completedExceptionally(new
IllegalStateException(
+ String.format("Job %s is not a streaming job.",
jobGraph.getJobID())));
--- End diff --
If the job is in a terminal state, the coordinator will be `null`ed as well.
---