Myasuka commented on a change in pull request #18024:
URL: https://github.com/apache/flink/pull/18024#discussion_r763929602



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1629,21 +1630,20 @@ private OptionalLong 
restoreLatestCheckpointedStateInternal(
     /**
      * Restore the state with given savepoint.
      *
-     * @param savepointPointer The pointer to the savepoint.
-     * @param allowNonRestored True if allowing checkpoint state that cannot 
be mapped to any job
-     *     vertex in tasks.
+     * @param restoreSettings TODO

Review comment:
       A left `TODO` is here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -1655,10 +1655,28 @@ public boolean restoreSavepoint(
         final CompletedCheckpointStorageLocation checkpointLocation =
                 checkpointStorageView.resolveCheckpoint(savepointPointer);
 
+        // convert to checkpoint so the system can fall back to it
+        final CheckpointProperties checkpointProperties;
+        switch (restoreSettings.getRestoreMode()) {
+            case CLAIM:
+                checkpointProperties = this.checkpointProperties;
+                break;
+            case NO_CLAIM:
+                checkpointProperties = 
CheckpointProperties.forSavepoint(false);
+                break;
+            default:
+                throw new IllegalStateException("Unknown snapshot restore 
mode");

Review comment:
       I think a `IllegalArgumentException` could be better.

##########
File path: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -129,6 +132,16 @@
                             + "You need to allow this if you removed an 
operator from your "
                             + "program that was part of the program when the 
savepoint was triggered.");
 
+    public static final Option SAVEPOINT_RESTORE_MODE =
+            new Option(
+                    "r",

Review comment:
       The short opt 'r' conflicts with `RUNNING_OPTION`, which might impact 
the dictint option parsing. BTW, I think `Option.builder` could be better 
choose to create this option.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/SavepointConfigOptions.java
##########
@@ -46,4 +46,14 @@
                     .withDescription(
                             "Allow to skip savepoint state that cannot be 
restored. "
                                     + "Allow this if you removed an operator 
from your pipeline after the savepoint was triggered.");
+    /**
+     * Describes the mode how Flink should restore from the given savepoint or 
retained checkpoint.
+     */
+    public static final ConfigOption<RestoreMode> RESTORE_MODE =
+            key("execution.savepoint-restore-mode")
+                    .enumType(RestoreMode.class)
+                    .defaultValue(RestoreMode.NO_CLAIM)
+                    .withDescription(
+                            "Describes the mode how Flink should restore from 
the given"
+                                    + " savepoint or retained checkpoint.");

Review comment:
       Since we choose `NO_CLAIM` as default mode, I think we deserve to 
describe the difference compared with previous mode. Otherwise, users could 
face the problem of increasing file system space occupation after several 
rounds of launching jobs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to