This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 49c4ed29f64b9b028f7b427bc6dff2ea2a137fb5
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Fri Dec 3 13:50:27 2021 +0100

    [FLINK-25155] Support claim mode in cli
---
 .../apache/flink/client/cli/CliFrontendParser.java | 40 ++++++++++++++++-----
 .../flink/client/cli/CliFrontendRunTest.java       | 41 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 8 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 8d960d2..a5f31c5 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -19,6 +19,9 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.configuration.CheckpointingOptions;
+import org.apache.flink.configuration.ConfigurationUtils;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
+import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
@@ -129,6 +132,15 @@ public class CliFrontendParser {
                             + "You need to allow this if you removed an 
operator from your "
                             + "program that was part of the program when the 
savepoint was triggered.");
 
+    public static final Option SAVEPOINT_RESTORE_MODE =
+            new Option(
+                    "restoreMode",
+                    true,
+                    "Defines how should we restore from the given savepoint. 
Supported options: "
+                            + "[claim - claim ownership of the savepoint and 
delete once it is"
+                            + " subsumed, legacy (default) - do not assume 
ownership of the"
+                            + " savepoint files.");
+
     static final Option SAVEPOINT_DISPOSE_OPTION =
             new Option("d", "dispose", true, "Path of savepoint to dispose.");
 
@@ -286,6 +298,7 @@ public class CliFrontendParser {
         SAVEPOINT_PATH_OPTION.setArgName("savepointPath");
 
         SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false);
+        SAVEPOINT_RESTORE_MODE.setRequired(false);
 
         ZOOKEEPER_NAMESPACE_OPTION.setRequired(false);
         ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace");
@@ -363,10 +376,10 @@ public class CliFrontendParser {
     }
 
     public static Options getRunCommandOptions() {
-        Options options = buildGeneralOptions(new Options());
-        options = getProgramSpecificOptions(options);
-        options.addOption(SAVEPOINT_PATH_OPTION);
-        return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
+        return getProgramSpecificOptions(buildGeneralOptions(new Options()))
+                .addOption(SAVEPOINT_PATH_OPTION)
+                .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)
+                .addOption(SAVEPOINT_RESTORE_MODE);
     }
 
     static Options getInfoCommandOptions() {
@@ -403,9 +416,10 @@ public class CliFrontendParser {
     // 
--------------------------------------------------------------------------------------------
 
     private static Options getRunOptionsWithoutDeprecatedOptions(Options 
options) {
-        Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options);
-        o.addOption(SAVEPOINT_PATH_OPTION);
-        return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
+        return getProgramSpecificOptionsWithoutDeprecatedOptions(options)
+                .addOption(SAVEPOINT_PATH_OPTION)
+                .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION)
+                .addOption(SAVEPOINT_RESTORE_MODE);
     }
 
     private static Options getInfoOptionsWithoutDeprecatedOptions(Options 
options) {
@@ -593,7 +607,17 @@ public class CliFrontendParser {
             String savepointPath = 
commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt());
             boolean allowNonRestoredState =
                     
commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt());
-            return SavepointRestoreSettings.forPath(savepointPath, 
allowNonRestoredState);
+            final RestoreMode restoreMode;
+            if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) {
+                restoreMode =
+                        ConfigurationUtils.convertValue(
+                                
commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE),
+                                RestoreMode.class);
+            } else {
+                restoreMode = 
SavepointConfigOptions.RESTORE_MODE.defaultValue();
+            }
+            return SavepointRestoreSettings.forPath(
+                    savepointPath, allowNonRestoredState, restoreMode);
         } else {
             return SavepointRestoreSettings.none();
         }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
index 968784e..bf47053 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.client.deployment.ClusterClientServiceLoader;
 import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.RestoreMode;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 
 import org.apache.commons.cli.CommandLine;
@@ -130,6 +131,46 @@ public class CliFrontendRunTest extends 
CliFrontendTestBase {
         }
     }
 
+    @Test
+    public void testClaimRestoreModeParsing() throws Exception {
+        // test configure savepoint with claim mode
+        String[] parameters = {
+            "-s", "expectedSavepointPath", "-n", "-restoreMode", "claim", 
getTestJarPath()
+        };
+
+        CommandLine commandLine =
+                CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, 
parameters, true);
+        ProgramOptions programOptions = ProgramOptions.create(commandLine);
+        ExecutionConfigAccessor executionOptions =
+                ExecutionConfigAccessor.fromProgramOptions(programOptions, 
Collections.emptyList());
+
+        SavepointRestoreSettings savepointSettings = 
executionOptions.getSavepointRestoreSettings();
+        assertTrue(savepointSettings.restoreSavepoint());
+        assertEquals(RestoreMode.CLAIM, savepointSettings.getRestoreMode());
+        assertEquals("expectedSavepointPath", 
savepointSettings.getRestorePath());
+        assertTrue(savepointSettings.allowNonRestoredState());
+    }
+
+    @Test
+    public void testLegacyRestoreModeParsing() throws Exception {
+        // test configure savepoint with claim mode
+        String[] parameters = {
+            "-s", "expectedSavepointPath", "-n", "-restoreMode", "legacy", 
getTestJarPath()
+        };
+
+        CommandLine commandLine =
+                CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, 
parameters, true);
+        ProgramOptions programOptions = ProgramOptions.create(commandLine);
+        ExecutionConfigAccessor executionOptions =
+                ExecutionConfigAccessor.fromProgramOptions(programOptions, 
Collections.emptyList());
+
+        SavepointRestoreSettings savepointSettings = 
executionOptions.getSavepointRestoreSettings();
+        assertTrue(savepointSettings.restoreSavepoint());
+        assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode());
+        assertEquals("expectedSavepointPath", 
savepointSettings.getRestorePath());
+        assertTrue(savepointSettings.allowNonRestoredState());
+    }
+
     @Test(expected = CliArgsException.class)
     public void testUnrecognizedOption() throws Exception {
         // test unrecognized option

Reply via email to