This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7641c239b777273126c10efab334e3247771a28b Author: Dawid Wysakowicz <[email protected]> AuthorDate: Fri Dec 3 15:47:03 2021 +0100 [FLINK-25155] Support claim mode in rest api --- .../shortcodes/generated/rest_v1_dispatcher.html | 10 ++++++++++ .../apache/flink/client/cli/CliFrontendRunTest.java | 2 +- .../runtime/webmonitor/handlers/JarRunHandler.java | 9 ++++++++- .../runtime/webmonitor/handlers/JarRunRequestBody.java | 18 ++++++++++++++++-- .../handlers/JarRunHandlerParameterTest.java | 6 ++++-- .../webmonitor/handlers/JarRunRequestBodyTest.java | 11 ++++++++++- .../src/test/resources/rest_api_v1.snapshot | 6 +++++- 7 files changed, 54 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 4e4b8ac..d4f7354 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -869,6 +869,10 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "string" } }, + "restoreMode" : { + "type" : "string", + "enum" : [ "CLAIM", "LEGACY" ] + }, "savepointPath" : { "type" : "string" } @@ -1028,6 +1032,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo", "properties" : { + "mtime" : { + "type" : "integer" + }, "name" : { "type" : "string" }, @@ -5729,6 +5736,9 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "object", "id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:LogInfo", "properties" : { + "mtime" : { + "type" : "integer" + }, "name" : { "type" : "string" }, diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java index bf47053..861bb1f 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendRunTest.java @@ -166,7 +166,7 @@ public class CliFrontendRunTest extends CliFrontendTestBase { SavepointRestoreSettings savepointSettings = executionOptions.getSavepointRestoreSettings(); assertTrue(savepointSettings.restoreSavepoint()); - assertEquals(RestoreMode.NO_CLAIM, savepointSettings.getRestoreMode()); + assertEquals(RestoreMode.LEGACY, savepointSettings.getRestoreMode()); assertEquals("expectedSavepointPath", savepointSettings.getRestorePath()); assertTrue(savepointSettings.allowNonRestoredState()); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index ad258d6..be57752 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -26,6 +26,8 @@ import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.RestoreMode; +import org.apache.flink.runtime.jobgraph.SavepointConfigOptions; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; @@ -40,6 +42,7 @@ import javax.annotation.Nonnull; import java.nio.file.Path; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -142,10 +145,14 @@ public class JarRunHandler request, SavepointPathQueryParameter.class)), null, log); + final RestoreMode restoreMode = + Optional.ofNullable(requestBody.getRestoreMode()) + .orElseGet(SavepointConfigOptions.RESTORE_MODE::defaultValue); final SavepointRestoreSettings savepointRestoreSettings; if (savepointPath != null) { savepointRestoreSettings = - SavepointRestoreSettings.forPath(savepointPath, allowNonRestoredState); + SavepointRestoreSettings.forPath( + savepointPath, allowNonRestoredState, restoreMode); } else { savepointRestoreSettings = SavepointRestoreSettings.none(); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java index 2ca235d..8e8e3fd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -35,6 +36,7 @@ import java.util.List; public class JarRunRequestBody extends JarRequestBody { private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState"; private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath"; + private static final String FIELD_NAME_SAVEPOINT_RESTORE_MODE = "restoreMode"; @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) @Nullable @@ -44,8 +46,12 @@ public class JarRunRequestBody extends JarRequestBody { @Nullable private String savepointPath; + @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) + @Nullable + private RestoreMode restoreMode; + public JarRunRequestBody() { - this(null, null, null, null, null, null, null); + this(null, null, null, null, null, null, null, null); } @JsonCreator @@ -58,10 +64,12 @@ public class JarRunRequestBody extends JarRequestBody { @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId, @Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState, - @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) { + @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath, + @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_RESTORE_MODE) RestoreMode restoreMode) { super(entryClassName, programArguments, programArgumentsList, parallelism, jobId); this.allowNonRestoredState = allowNonRestoredState; this.savepointPath = savepointPath; + this.restoreMode = restoreMode; } @Nullable @@ -75,4 +83,10 @@ public class JarRunRequestBody extends JarRequestBody { public String getSavepointPath() { return savepointPath; } + + @Nullable + @JsonIgnore + public RestoreMode getRestoreMode() { + return restoreMode; + } } diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java index 5ad4d60..01835b0 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.DeploymentOptions; import org.apache.flink.runtime.dispatcher.DispatcherGateway; import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -182,12 +183,13 @@ public class JarRunHandlerParameterTest PARALLELISM, null, ALLOW_NON_RESTORED_STATE_QUERY, - RESTORE_PATH); + RESTORE_PATH, + RestoreMode.CLAIM); } @Override JarRunRequestBody getJarRequestBodyWithJobId(JobID jobId) { - return new JarRunRequestBody(null, null, null, null, jobId, null, null); + return new JarRunRequestBody(null, null, null, null, jobId, null, null, null); } @Test diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java index 38c964a..bec96cc 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.jobgraph.RestoreMode; import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase; import java.util.Arrays; @@ -36,7 +37,14 @@ public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRun @Override protected JarRunRequestBody getTestRequestInstance() { return new JarRunRequestBody( - "hello", "world", Arrays.asList("boo", "far"), 4, new JobID(), true, "foo/bar"); + "hello", + "world", + Arrays.asList("boo", "far"), + 4, + new JobID(), + true, + "foo/bar", + RestoreMode.CLAIM); } @Override @@ -49,5 +57,6 @@ public class JarRunRequestBodyTest extends RestRequestMarshallingTestBase<JarRun assertEquals(expected.getJobId(), actual.getJobId()); assertEquals(expected.getAllowNonRestoredState(), actual.getAllowNonRestoredState()); assertEquals(expected.getSavepointPath(), actual.getSavepointPath()); + assertEquals(expected.getRestoreMode(), actual.getRestoreMode()); } } diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 279adbb..8ca5d08 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -435,6 +435,10 @@ }, "savepointPath" : { "type" : "string" + }, + "restoreMode" : { + "type" : "string", + "enum" : [ "CLAIM", "LEGACY" ] } } }, @@ -3374,4 +3378,4 @@ } } } ] -} \ No newline at end of file +}
