This is an automated email from the ASF dual-hosted git repository. jqin pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit d79590d491e12784ef1b98c0aca6b742a6e98787 Author: Stephan Ewen <[email protected]> AuthorDate: Fri Nov 27 22:34:51 2020 +0100 [FLINK-20396][checkpointing] Checkpoint Coordinator exposes the checkpoint ID of the restored checkpoint for partial state restores. This closes #14255 --- .../runtime/checkpoint/CheckpointCoordinator.java | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index cccebf7..1c9f76d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -67,6 +67,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.OptionalLong; import java.util.PriorityQueue; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -1227,11 +1228,13 @@ public class CheckpointCoordinator { boolean errorIfNoCheckpoint, boolean allowNonRestoredState) throws Exception { - return restoreLatestCheckpointedStateInternal( + final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( new HashSet<>(tasks.values()), OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, errorIfNoCheckpoint, allowNonRestoredState); + + return restoredCheckpointId.isPresent(); } /** @@ -1242,7 +1245,9 @@ public class CheckpointCoordinator { * @param tasks Set of job vertices to restore. State for these vertices is * restored via {@link Execution#setInitialState(JobManagerTaskRestore)}. - * @return <code>true</code> if state was restored, <code>false</code> otherwise. + * @return An {@code OptionalLong} with the checkpoint ID, if state was restored, + * an empty {@code OptionalLong} otherwise. + * * @throws IllegalStateException If the CheckpointCoordinator is shut down. * @throws IllegalStateException If no completed checkpoint is available and * the <code>failIfNoCheckpoint</code> flag has been set. @@ -1255,7 +1260,7 @@ public class CheckpointCoordinator { * that restores <i>non-partitioned</i> state from this * checkpoint. */ - public boolean restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception { + public OptionalLong restoreLatestCheckpointedStateToSubtasks(final Set<ExecutionJobVertex> tasks) throws Exception { // when restoring subtasks only we accept potentially unmatched state for the // following reasons // - the set frequently does not include all Job Vertices (only the ones that are part @@ -1296,11 +1301,13 @@ public class CheckpointCoordinator { final Set<ExecutionJobVertex> tasks, final boolean allowNonRestoredState) throws Exception { - return restoreLatestCheckpointedStateInternal( + final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, // global recovery restores coordinators, or resets them to empty false, // recovery might come before first successful checkpoint allowNonRestoredState); + + return restoredCheckpointId.isPresent(); } /** @@ -1314,14 +1321,22 @@ public class CheckpointCoordinator { * @return True, if a checkpoint was found and its state was restored, false otherwise. */ public boolean restoreInitialCheckpointIfPresent(final Set<ExecutionJobVertex> tasks) throws Exception { - return restoreLatestCheckpointedStateInternal( + final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( tasks, OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, false, // initial checkpoints exist only on JobManager failover. ok if not present. false); // JobManager failover means JobGraphs match exactly. + + return restoredCheckpointId.isPresent(); } - private boolean restoreLatestCheckpointedStateInternal( + /** + * Performs the actual restore operation to the given tasks. + * + * <p>This method returns the restored checkpoint ID (as an optional) or an empty + * optional, if no checkpoint was restored. + */ + private OptionalLong restoreLatestCheckpointedStateInternal( final Set<ExecutionJobVertex> tasks, final OperatorCoordinatorRestoreBehavior operatorCoordinatorRestoreBehavior, final boolean errorIfNoCheckpoint, @@ -1370,7 +1385,7 @@ public class CheckpointCoordinator { Collections.emptyMap()); } - return false; + return OptionalLong.empty(); } LOG.info("Restoring job {} from latest valid checkpoint: {}.", job, latest); @@ -1411,7 +1426,7 @@ public class CheckpointCoordinator { statsTracker.reportRestoredCheckpoint(restored); } - return true; + return OptionalLong.of(latest.getCheckpointID()); } } @@ -1452,11 +1467,13 @@ public class CheckpointCoordinator { LOG.info("Reset the checkpoint ID of job {} to {}.", job, nextCheckpointId); - return restoreLatestCheckpointedStateInternal( + final OptionalLong restoredCheckpointId = restoreLatestCheckpointedStateInternal( new HashSet<>(tasks.values()), OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT, true, allowNonRestored); + + return restoredCheckpointId.isPresent(); } // ------------------------------------------------------------------------
