This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 49c4ed29f64b9b028f7b427bc6dff2ea2a137fb5 Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Dec 3 13:50:27 2021 +0100 [FLINK-25155] Support claim mode in cli --- .../apache/flink/client/cli/CliFrontendParser.java | 40 ++++++++++++++++----- .../flink/client/cli/CliFrontendRunTest.java | 41 ++++++++++++++++++++++ 2 files changed, 73 insertions(+), 8 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 8d960d2..a5f31c5 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -19,6 +19,9 @@ package org.apache.flink.client.cli; import org.apache.flink.configuration.CheckpointingOptions; +import org.apache.flink.configuration.ConfigurationUtils; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; @@ -129,6 +132,15 @@ public class CliFrontendParser { + "You need to allow this if you removed an operator from your " + "program that was part of the program when the savepoint was triggered."); + public static final Option SAVEPOINT_RESTORE_MODE = + new Option( + "restoreMode", + true, + "Defines how should we restore from the given savepoint. Supported options: " + + "[claim - claim ownership of the savepoint and delete once it is" + + " subsumed, legacy (default) - do not assume ownership of the" + + " savepoint files."); + static final Option SAVEPOINT_DISPOSE_OPTION = new Option("d", "dispose", true, "Path of savepoint to dispose."); @@ -286,6 +298,7 @@ public class CliFrontendParser { SAVEPOINT_PATH_OPTION.setArgName("savepointPath"); SAVEPOINT_ALLOW_NON_RESTORED_OPTION.setRequired(false); + SAVEPOINT_RESTORE_MODE.setRequired(false); ZOOKEEPER_NAMESPACE_OPTION.setRequired(false); ZOOKEEPER_NAMESPACE_OPTION.setArgName("zookeeperNamespace"); @@ -363,10 +376,10 @@ public class CliFrontendParser { } public static Options getRunCommandOptions() { - Options options = buildGeneralOptions(new Options()); - options = getProgramSpecificOptions(options); - options.addOption(SAVEPOINT_PATH_OPTION); - return options.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); + return getProgramSpecificOptions(buildGeneralOptions(new Options())) + .addOption(SAVEPOINT_PATH_OPTION) + .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION) + .addOption(SAVEPOINT_RESTORE_MODE); } static Options getInfoCommandOptions() { @@ -403,9 +416,10 @@ public class CliFrontendParser { // -------------------------------------------------------------------------------------------- private static Options getRunOptionsWithoutDeprecatedOptions(Options options) { - Options o = getProgramSpecificOptionsWithoutDeprecatedOptions(options); - o.addOption(SAVEPOINT_PATH_OPTION); - return o.addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION); + return getProgramSpecificOptionsWithoutDeprecatedOptions(options) + .addOption(SAVEPOINT_PATH_OPTION) + .addOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION) + .addOption(SAVEPOINT_RESTORE_MODE); } private static Options getInfoOptionsWithoutDeprecatedOptions(Options options) { @@ -593,7 +607,17 @@ public class CliFrontendParser { String savepointPath = commandLine.getOptionValue(SAVEPOINT_PATH_OPTION.getOpt()); boolean allowNonRestoredState = commandLine.hasOption(SAVEPOINT_ALLOW_NON_RESTORED_OPTION.getOpt()); - return SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState); + final RestoreMode restoreMode; + if (commandLine.hasOption(SAVEPOINT_RESTORE_MODE)) { + restoreMode = + ConfigurationUtils.convertValue( + commandLine.getOptionValue(SAVEPOINT_RESTORE_MODE), + RestoreMode.class); + } else { + restoreMode = SavepointConfigOptions.RESTORE_MODE.defaultValue(); + } + return SavepointRestoreSettings.forPath( + savepointPath, allowNonRestoredState, restoreMode); } else { return SavepointRestoreSettings.none(); } diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index 968784e..bf47053 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -22,6 +22,7 @@ import org.apache.flink.client.deployment.ClusterClientServiceLoader; import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.commons.cli.CommandLine; @@ -130,6 +131,46 @@ public class CliFrontendRunTest extends CliFrontendTestBase { } } + @Test + public void testClaimRestoreModeParsing() throws Exception { + // test configure savepoint with claim mode + String[] parameters = { + "-s", "expectedSavepointPath", "-n", "-restoreMode", "claim", getTestJarPath() + }; + + CommandLine commandLine = + CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true); + ProgramOptions programOptions = ProgramOptions.create(commandLine); + ExecutionConfigAccessor executionOptions = + ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList()); + + SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); + assertTrue(savepointSettings.restoreSavepoint()); + assertEquals(RestoreMode.CLAIM, savepointSettings.getRestoreMode()); + assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); + assertTrue(savepointSettings.allowNonRestoredState()); + } + + @Test + public void testLegacyRestoreModeParsing() throws Exception { + // test configure savepoint with claim mode + String[] parameters = { + "-s", "expectedSavepointPath", "-n", "-restoreMode", "legacy", getTestJarPath() + }; + + CommandLine commandLine = + CliFrontendParser.parse(CliFrontendParser.RUN_OPTIONS, parameters, true); + ProgramOptions programOptions = ProgramOptions.create(commandLine); + ExecutionConfigAccessor executionOptions = + ExecutionConfigAccessor.fromProgramOptions(programOptions, Collections.emptyList()); + + SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); + assertTrue(savepointSettings.restoreSavepoint()); + assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode()); + assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); + assertTrue(savepointSettings.allowNonRestoredState()); + } + @Test(expected = CliArgsException.class) public void testUnrecognizedOption() throws Exception { // test unrecognized option
