Github user GJL commented on a diff in the pull request:
https://github.com/apache/flink/pull/5560#discussion_r170300254
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
---
@@ -534,42 +536,94 @@ public void postStop() throws Exception {
// 4. take a savepoint
final CompletableFuture<String> savepointFuture =
triggerSavepoint(
- jobMasterConfiguration.getTmpDirectory(),
- timeout);
+ null,
+ timeout)
+ .handleAsync(
+ (String savepointPath, Throwable throwable) -> {
+ if (throwable != null) {
+ final Throwable
strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
+ if (strippedThrowable
instanceof CheckpointTriggerException) {
+ final
CheckpointTriggerException checkpointTriggerException =
(CheckpointTriggerException) strippedThrowable;
+
+ if
(checkpointTriggerException.getCheckpointDeclineReason() ==
CheckpointDeclineReason.NOT_ALL_REQUIRED_TASKS_RUNNING) {
+ return
lastInternalSavepoint;
+ } else {
+ throw new
CompletionException(checkpointTriggerException);
+ }
+ } else {
+ throw new
CompletionException(strippedThrowable);
+ }
+ } else {
+ final String savepointToDispose
= lastInternalSavepoint;
--- End diff --
I think `savepointToDispose` be `null`.
---