This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d79590d491e12784ef1b98c0aca6b742a6e98787
Author: Stephan Ewen <[email protected]>
AuthorDate: Fri Nov 27 22:34:51 2020 +0100

    [FLINK-20396][checkpointing] Checkpoint Coordinator exposes the checkpoint 
ID of the restored checkpoint for partial state restores.
    
    This closes #14255
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index cccebf7..1c9f76d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -67,6 +67,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.PriorityQueue;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -1227,11 +1228,13 @@ public class CheckpointCoordinator {
                        boolean errorIfNoCheckpoint,
                        boolean allowNonRestoredState) throws Exception {
 
-               return restoreLatestCheckpointedStateInternal(
+               final OptionalLong restoredCheckpointId =  
restoreLatestCheckpointedStateInternal(
                                new HashSet<>(tasks.values()),
                                
OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET,
                                errorIfNoCheckpoint,
                                allowNonRestoredState);
+
+               return restoredCheckpointId.isPresent();
        }
 
        /**
@@ -1242,7 +1245,9 @@ public class CheckpointCoordinator {
         * @param tasks Set of job vertices to restore. State for these 
vertices is
         * restored via {@link 
Execution#setInitialState(JobManagerTaskRestore)}.
 
-        * @return <code>true</code> if state was restored, <code>false</code> 
otherwise.
+        * @return An {@code OptionalLong} with the checkpoint ID, if state was 
restored,
+        *         an empty {@code OptionalLong} otherwise.
+        *
         * @throws IllegalStateException If the CheckpointCoordinator is shut 
down.
         * @throws IllegalStateException If no completed checkpoint is 
available and
         *                               the <code>failIfNoCheckpoint</code> 
flag has been set.
@@ -1255,7 +1260,7 @@ public class CheckpointCoordinator {
         *                               that restores <i>non-partitioned</i> 
state from this
         *                               checkpoint.
         */
-       public boolean restoreLatestCheckpointedStateToSubtasks(final 
Set<ExecutionJobVertex> tasks) throws Exception {
+       public OptionalLong restoreLatestCheckpointedStateToSubtasks(final 
Set<ExecutionJobVertex> tasks) throws Exception {
                // when restoring subtasks only we accept potentially unmatched 
state for the
                // following reasons
                //   - the set frequently does not include all Job Vertices 
(only the ones that are part
@@ -1296,11 +1301,13 @@ public class CheckpointCoordinator {
                        final Set<ExecutionJobVertex> tasks,
                        final boolean allowNonRestoredState) throws Exception {
 
-               return restoreLatestCheckpointedStateInternal(
+               final OptionalLong restoredCheckpointId = 
restoreLatestCheckpointedStateInternal(
                                tasks,
                                
OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET, // global recovery 
restores coordinators, or resets them to empty
                                false,   // recovery might come before first 
successful checkpoint
                                allowNonRestoredState);
+
+               return restoredCheckpointId.isPresent();
        }
 
        /**
@@ -1314,14 +1321,22 @@ public class CheckpointCoordinator {
         * @return True, if a checkpoint was found and its state was restored, 
false otherwise.
         */
        public boolean restoreInitialCheckpointIfPresent(final 
Set<ExecutionJobVertex> tasks) throws Exception {
-               return restoreLatestCheckpointedStateInternal(
+               final OptionalLong restoredCheckpointId = 
restoreLatestCheckpointedStateInternal(
                        tasks,
                        
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
                        false,    // initial checkpoints exist only on 
JobManager failover. ok if not present.
                        false); // JobManager failover means JobGraphs match 
exactly.
+
+               return restoredCheckpointId.isPresent();
        }
 
-       private boolean restoreLatestCheckpointedStateInternal(
+       /**
+        * Performs the actual restore operation to the given tasks.
+        *
+        * <p>This method returns the restored checkpoint ID (as an optional) 
or an empty
+        * optional, if no checkpoint was restored.
+        */
+       private OptionalLong restoreLatestCheckpointedStateInternal(
                final Set<ExecutionJobVertex> tasks,
                final OperatorCoordinatorRestoreBehavior 
operatorCoordinatorRestoreBehavior,
                final boolean errorIfNoCheckpoint,
@@ -1370,7 +1385,7 @@ public class CheckpointCoordinator {
                                                        Collections.emptyMap());
                                }
 
-                               return false;
+                               return OptionalLong.empty();
                        }
 
                        LOG.info("Restoring job {} from latest valid 
checkpoint: {}.", job, latest);
@@ -1411,7 +1426,7 @@ public class CheckpointCoordinator {
                                statsTracker.reportRestoredCheckpoint(restored);
                        }
 
-                       return true;
+                       return OptionalLong.of(latest.getCheckpointID());
                }
        }
 
@@ -1452,11 +1467,13 @@ public class CheckpointCoordinator {
 
                LOG.info("Reset the checkpoint ID of job {} to {}.", job, 
nextCheckpointId);
 
-               return restoreLatestCheckpointedStateInternal(
+               final OptionalLong restoredCheckpointId = 
restoreLatestCheckpointedStateInternal(
                                new HashSet<>(tasks.values()),
                                
OperatorCoordinatorRestoreBehavior.RESTORE_IF_CHECKPOINT_PRESENT,
                                true,
                                allowNonRestored);
+
+               return restoredCheckpointId.isPresent();
        }
 
        // 
------------------------------------------------------------------------

Reply via email to