Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5622#discussion_r171857970
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -971,14 +971,44 @@ public void heartbeatFromResourceManager(final
ResourceID resourceID) {
@Override
public CompletableFuture<String> triggerSavepoint(
- @Nullable final String targetDirectory,
- final Time timeout) {
- try {
- return executionGraph.getCheckpointCoordinator()
- .triggerSavepoint(System.currentTimeMillis(),
targetDirectory)
-
.thenApply(CompletedCheckpoint::getExternalPointer);
- } catch (Exception e) {
- return FutureUtils.completedExceptionally(e);
+ @Nullable final String targetDirectory,
+ final boolean cancelJob,
+ final Time timeout) {
+
+ final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
+ if (checkpointCoordinator == null) {
+ return FutureUtils.completedExceptionally(new
IllegalStateException(
+ String.format("Job %s is not a streaming job.",
jobGraph.getJobID())));
+ }
+
+ if (cancelJob) {
+ checkpointCoordinator.stopCheckpointScheduler();
+ }
+ return checkpointCoordinator
+ .triggerSavepoint(System.currentTimeMillis(),
targetDirectory)
+ .thenApply(CompletedCheckpoint::getExternalPointer)
+ .thenApplyAsync(path -> {
+ if (cancelJob) {
+ log.info("Savepoint stored in {}. Now
cancelling {}.", path, jobGraph.getJobID());
+ cancel(timeout);
+ }
+ return path;
+ }, getMainThreadExecutor())
+ .exceptionally(throwable -> {
+ if (cancelJob) {
+
startCheckpointScheduler(checkpointCoordinator);
+ }
+ throw new CompletionException(throwable);
+ });
+ }
+
+ private void startCheckpointScheduler(final CheckpointCoordinator
checkpointCoordinator) {
--- End diff --
Method can be reused in the job rescaling logic.
---