This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4758c3f9d8269f1affd1ebbc52a65480ecb42c06
Author: Yi Zhang <[email protected]>
AuthorDate: Tue Dec 23 14:03:28 2025 +0800

    [FLINK-38763][runtime-web] Add REST API for JarRunApplication
---
 .../application/PackagedProgramApplication.java    |  10 +
 .../runtime/webmonitor/WebSubmissionExtension.java |  19 +
 .../handlers/JarRunApplicationHandler.java         | 172 +++++++
 .../handlers/JarRunApplicationHeaders.java         |  85 ++++
 .../JarRunApplicationMessageParameters.java        |  28 ++
 .../handlers/JarRunApplicationRequestBody.java     | 120 +++++
 .../handlers/JarRunApplicationResponseBody.java    |  51 ++
 .../webmonitor/handlers/utils/JarHandlerUtils.java |  25 +-
 .../JarRunApplicationHandlerParameterTest.java     | 514 +++++++++++++++++++++
 .../handlers/JarRunApplicationRequestBodyTest.java |  73 +++
 .../JarRunApplicationResponseBodyTest.java         |  50 ++
 .../src/test/resources/rest_api_v1.snapshot        |  77 +++
 12 files changed, 1222 insertions(+), 2 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
index de5de97b752..95f54b69de1 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/PackagedProgramApplication.java
@@ -298,6 +298,16 @@ public class PackagedProgramApplication extends 
AbstractApplication {
         return finishApplicationFuture;
     }
 
+    @VisibleForTesting
+    public Configuration getConfiguration() {
+        return configuration;
+    }
+
+    @VisibleForTesting
+    public PackagedProgram getPackagedProgram() {
+        return program;
+    }
+
     private CompletableFuture<Acknowledge> onApplicationCanceledOrFailed(
             final DispatcherGateway dispatcherGateway,
             final ScheduledExecutor scheduledExecutor,
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
index 93ff7baae98..188ee9ece3c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -32,6 +32,8 @@ import 
org.apache.flink.runtime.webmonitor.handlers.JarListHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarPlanGetHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarPlanPostHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunApplicationHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunApplicationHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
@@ -60,6 +62,7 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
     // for easier access during testing
     private final JarUploadHandler jarUploadHandler;
     private final JarRunHandler jarRunHandler;
+    private final JarRunApplicationHandler jarRunApplicationHandler;
 
     public WebSubmissionExtension(
             Configuration configuration,
@@ -132,6 +135,15 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
                         jarRunExecutor,
                         applicationRunnerSupplier);
 
+        jarRunApplicationHandler =
+                new JarRunApplicationHandler(
+                        leaderRetriever,
+                        timeout,
+                        responseHeaders,
+                        JarRunApplicationHeaders.getInstance(),
+                        jarDir,
+                        configuration);
+
         final JarDeleteHandler jarDeleteHandler =
                 new JarDeleteHandler(
                         leaderRetriever,
@@ -164,6 +176,8 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
         webSubmissionHandlers.add(Tuple2.of(JarUploadHeaders.getInstance(), 
jarUploadHandler));
         webSubmissionHandlers.add(Tuple2.of(JarListHeaders.getInstance(), 
jarListHandler));
         webSubmissionHandlers.add(Tuple2.of(JarRunHeaders.getInstance(), 
jarRunHandler));
+        webSubmissionHandlers.add(
+                Tuple2.of(JarRunApplicationHeaders.getInstance(), 
jarRunApplicationHandler));
         webSubmissionHandlers.add(Tuple2.of(JarDeleteHeaders.getInstance(), 
jarDeleteHandler));
         webSubmissionHandlers.add(Tuple2.of(JarPlanGetHeaders.getInstance(), 
jarPlanHandler));
         webSubmissionHandlers.add(Tuple2.of(JarPlanPostHeaders.getInstance(), 
postJarPlanHandler));
@@ -188,4 +202,9 @@ public class WebSubmissionExtension implements 
WebMonitorExtension {
     JarRunHandler getJarRunHandler() {
         return jarRunHandler;
     }
+
+    @VisibleForTesting
+    JarRunApplicationHandler getJarRunApplicationHandler() {
+        return jarRunApplicationHandler;
+    }
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
new file mode 100644
index 00000000000..ec8389f4e71
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandler.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
+import 
org.apache.flink.client.deployment.application.PackagedProgramApplication;
+import 
org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.DeploymentOptions;
+import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.core.execution.RecoveryClaimMode;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+
+import java.nio.file.Path;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava33.com.google.common.base.Strings.emptyToNull;
+
+/** Handler to submit applications uploaded via the Web UI. */
+public class JarRunApplicationHandler
+        extends AbstractRestHandler<
+                DispatcherGateway,
+                JarRunApplicationRequestBody,
+                JarRunApplicationResponseBody,
+                JarRunApplicationMessageParameters> {
+
+    private final Path jarDir;
+
+    private final Configuration configuration;
+
+    public JarRunApplicationHandler(
+            final GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
+            final Duration timeout,
+            final Map<String, String> responseHeaders,
+            final MessageHeaders<
+                            JarRunApplicationRequestBody,
+                            JarRunApplicationResponseBody,
+                            JarRunApplicationMessageParameters>
+                    messageHeaders,
+            final Path jarDir,
+            final Configuration configuration) {
+        super(leaderRetriever, timeout, responseHeaders, messageHeaders);
+
+        this.jarDir = requireNonNull(jarDir);
+        this.configuration = requireNonNull(configuration);
+    }
+
+    @Override
+    @VisibleForTesting
+    public CompletableFuture<JarRunApplicationResponseBody> handleRequest(
+            @Nonnull final HandlerRequest<JarRunApplicationRequestBody> 
request,
+            @Nonnull final DispatcherGateway gateway)
+            throws RestHandlerException {
+
+        final Configuration effectiveConfiguration = new 
Configuration(configuration);
+        effectiveConfiguration.set(DeploymentOptions.TARGET, 
EmbeddedExecutor.NAME);
+
+        final JarHandlerContext context = 
JarHandlerContext.fromRequest(request, jarDir, log);
+        context.applyToConfiguration(effectiveConfiguration, request);
+        SavepointRestoreSettings.toConfiguration(
+                getSavepointRestoreSettings(request, effectiveConfiguration),
+                effectiveConfiguration);
+
+        final PackagedProgram program = 
context.toPackagedProgram(effectiveConfiguration);
+
+        ApplicationID applicationId = 
context.getApplicationId().orElse(ApplicationID.generate());
+        PackagedProgramApplication application =
+                new PackagedProgramApplication(
+                        applicationId,
+                        program,
+                        Collections.emptyList(),
+                        effectiveConfiguration,
+                        false,
+                        true,
+                        false,
+                        false);
+
+        return gateway.submitApplication(application, timeout)
+                .handle(
+                        (acknowledge, throwable) -> {
+                            if (throwable != null) {
+                                throw new CompletionException(
+                                        new RestHandlerException(
+                                                "Could not submit 
application.",
+                                                HttpResponseStatus.BAD_REQUEST,
+                                                throwable));
+                            }
+                            return new 
JarRunApplicationResponseBody(applicationId);
+                        });
+    }
+
+    private SavepointRestoreSettings getSavepointRestoreSettings(
+            final @Nonnull HandlerRequest<JarRunApplicationRequestBody> 
request,
+            final Configuration effectiveConfiguration)
+            throws RestHandlerException {
+
+        final JarRunApplicationRequestBody requestBody = 
request.getRequestBody();
+
+        final boolean allowNonRestoredState =
+                fromRequestBodyOrQueryParameter(
+                        requestBody.getAllowNonRestoredState().orElse(null),
+                        () -> getQueryParameter(request, 
AllowNonRestoredStateQueryParameter.class),
+                        effectiveConfiguration.get(
+                                
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE),
+                        log);
+        final String savepointPath =
+                fromRequestBodyOrQueryParameter(
+                        
emptyToNull(requestBody.getSavepointPath().orElse(null)),
+                        () ->
+                                emptyToNull(
+                                        getQueryParameter(
+                                                request, 
SavepointPathQueryParameter.class)),
+                        
effectiveConfiguration.get(StateRecoveryOptions.SAVEPOINT_PATH),
+                        log);
+        final RecoveryClaimMode recoveryClaimMode =
+                requestBody
+                        .getRecoveryClaimMode()
+                        
.orElse(effectiveConfiguration.get(StateRecoveryOptions.RESTORE_MODE));
+        if (recoveryClaimMode.equals(RecoveryClaimMode.LEGACY)) {
+            log.warn(
+                    "The {} restore mode is deprecated, please use {} or {} 
mode instead.",
+                    RecoveryClaimMode.LEGACY,
+                    RecoveryClaimMode.CLAIM,
+                    RecoveryClaimMode.NO_CLAIM);
+        }
+        final SavepointRestoreSettings savepointRestoreSettings;
+        if (savepointPath != null) {
+            savepointRestoreSettings =
+                    SavepointRestoreSettings.forPath(
+                            savepointPath, allowNonRestoredState, 
recoveryClaimMode);
+        } else {
+            savepointRestoreSettings = SavepointRestoreSettings.none();
+        }
+        return savepointRestoreSettings;
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHeaders.java
new file mode 100644
index 00000000000..a8dbe701582
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHeaders.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.RuntimeMessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/** {@link MessageHeaders} for {@link JarRunApplicationHandler}. */
+public class JarRunApplicationHeaders
+        implements RuntimeMessageHeaders<
+                JarRunApplicationRequestBody,
+                JarRunApplicationResponseBody,
+                JarRunApplicationMessageParameters> {
+
+    private static final JarRunApplicationHeaders INSTANCE = new 
JarRunApplicationHeaders();
+
+    private static final String URL = "/jars/:" + JarIdPathParameter.KEY + 
"/run-application";
+
+    private JarRunApplicationHeaders() {}
+
+    @Override
+    public Class<JarRunApplicationResponseBody> getResponseClass() {
+        return JarRunApplicationResponseBody.class;
+    }
+
+    @Override
+    public HttpResponseStatus getResponseStatusCode() {
+        return HttpResponseStatus.ACCEPTED;
+    }
+
+    @Override
+    public Class<JarRunApplicationRequestBody> getRequestClass() {
+        return JarRunApplicationRequestBody.class;
+    }
+
+    @Override
+    public JarRunApplicationMessageParameters getUnresolvedMessageParameters() 
{
+        return new JarRunApplicationMessageParameters();
+    }
+
+    @Override
+    public HttpMethodWrapper getHttpMethod() {
+        return HttpMethodWrapper.POST;
+    }
+
+    @Override
+    public String getTargetRestEndpointURL() {
+        return URL;
+    }
+
+    public static JarRunApplicationHeaders getInstance() {
+        return INSTANCE;
+    }
+
+    @Override
+    public String getDescription() {
+        return "Submits an application by running a jar previously uploaded 
via '"
+                + JarUploadHeaders.URL
+                + "'. ";
+    }
+
+    @Override
+    public String operationId() {
+        return "submitApplicationFromJar";
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationMessageParameters.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationMessageParameters.java
new file mode 100644
index 00000000000..235231da61d
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationMessageParameters.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+
+/**
+ * {@link MessageParameters} for {@link JarRunApplicationHandler}.
+ *
+ * <p>Currently identical to {@link JarRunMessageParameters}.
+ */
+public class JarRunApplicationMessageParameters extends 
JarRunMessageParameters {}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBody.java
new file mode 100644
index 00000000000..b7bb0f577c1
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBody.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.RecoveryClaimMode;
+import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * {@link RequestBody} for running a jar as an application.
+ *
+ * <p>Nearly identical to {@link JarRunRequestBody}, but includes additional 
fields specific to for
+ * application and omits deprecated fields.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarRunApplicationRequestBody extends JarRequestBody {
+    private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = 
"allowNonRestoredState";
+
+    private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+
+    private static final String FIELD_NAME_SAVEPOINT_CLAIM_MODE = "claimMode";
+
+    private static final String FIELD_NAME_APPLICATION_ID = "applicationId";
+
+    @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
+    @Nullable
+    private final Boolean allowNonRestoredState;
+
+    @JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
+    @Nullable
+    private final String savepointPath;
+
+    @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
+    @Nullable
+    private final RecoveryClaimMode recoveryClaimMode;
+
+    @JsonProperty(FIELD_NAME_APPLICATION_ID)
+    @JsonDeserialize(using = ApplicationIDDeserializer.class)
+    @JsonSerialize(using = ApplicationIDSerializer.class)
+    @Nullable
+    private final ApplicationID applicationId;
+
+    public JarRunApplicationRequestBody() {
+        this(null, null, null, null, null, null, null, null, null);
+    }
+
+    @JsonCreator
+    public JarRunApplicationRequestBody(
+            @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
+            @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST)
+                    List<String> programArgumentsList,
+            @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism,
+            @Nullable @JsonProperty(FIELD_NAME_JOB_ID) JobID jobId,
+            @Nullable @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
+                    Boolean allowNonRestoredState,
+            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String 
savepointPath,
+            @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_CLAIM_MODE)
+                    RecoveryClaimMode recoveryClaimMode,
+            @Nullable @JsonProperty(FIELD_NAME_FLINK_CONFIGURATION)
+                    Map<String, String> flinkConfiguration,
+            @Nullable @JsonProperty(FIELD_NAME_APPLICATION_ID) ApplicationID 
applicationId) {
+        super(entryClassName, programArgumentsList, parallelism, jobId, 
flinkConfiguration);
+        this.allowNonRestoredState = allowNonRestoredState;
+        this.savepointPath = savepointPath;
+        this.recoveryClaimMode = recoveryClaimMode;
+        this.applicationId = applicationId;
+    }
+
+    @JsonIgnore
+    public Optional<Boolean> getAllowNonRestoredState() {
+        return Optional.ofNullable(allowNonRestoredState);
+    }
+
+    @JsonIgnore
+    public Optional<String> getSavepointPath() {
+        return Optional.ofNullable(savepointPath);
+    }
+
+    @JsonIgnore
+    public Optional<RecoveryClaimMode> getRecoveryClaimMode() {
+        return Optional.ofNullable(recoveryClaimMode);
+    }
+
+    @JsonIgnore
+    public Optional<ApplicationID> getApplicationId() {
+        return Optional.ofNullable(applicationId);
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBody.java
new file mode 100644
index 00000000000..74394efe954
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBody.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.ApplicationIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.ApplicationIDSerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import static java.util.Objects.requireNonNull;
+
+/** Response for {@link JarRunApplicationHandler}. */
+public class JarRunApplicationResponseBody implements ResponseBody {
+
+    @JsonProperty("applicationid")
+    @JsonDeserialize(using = ApplicationIDDeserializer.class)
+    @JsonSerialize(using = ApplicationIDSerializer.class)
+    private final ApplicationID applicationId;
+
+    @JsonCreator
+    public JarRunApplicationResponseBody(
+            @JsonProperty("applicationid") @JsonDeserialize(using = 
ApplicationIDDeserializer.class)
+                    final ApplicationID applicationId) {
+        this.applicationId = requireNonNull(applicationId);
+    }
+
+    public ApplicationID getApplicationId() {
+        return applicationId;
+    }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index a48f230eac3..cb8d21f22df 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers.utils;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ApplicationID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
@@ -35,6 +36,7 @@ import 
org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.webmonitor.handlers.EntryClassQueryParameter;
 import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter;
 import org.apache.flink.runtime.webmonitor.handlers.JarRequestBody;
+import 
org.apache.flink.runtime.webmonitor.handlers.JarRunApplicationRequestBody;
 import org.apache.flink.runtime.webmonitor.handlers.ParallelismQueryParameter;
 import org.apache.flink.runtime.webmonitor.handlers.ProgramArgQueryParameter;
 
@@ -52,6 +54,7 @@ import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 import java.util.concurrent.CompletionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -76,18 +79,21 @@ public class JarHandlerUtils {
         private final List<String> programArgs;
         private final int parallelism;
         private final JobID jobId;
+        @Nullable final ApplicationID applicationId;
 
         private JarHandlerContext(
                 Path jarFile,
                 String entryClass,
                 List<String> programArgs,
                 int parallelism,
-                JobID jobId) {
+                JobID jobId,
+                @Nullable ApplicationID applicationId) {
             this.jarFile = jarFile;
             this.entryClass = entryClass;
             this.programArgs = programArgs;
             this.parallelism = parallelism;
             this.jobId = jobId;
+            this.applicationId = applicationId;
         }
 
         public static <R extends JarRequestBody> JarHandlerContext fromRequest(
@@ -128,7 +134,18 @@ public class JarHandlerUtils {
                             null, // Delegate default job ID to actual 
JobGraph generation
                             log);
 
-            return new JarHandlerContext(jarFile, entryClass, programArgs, 
parallelism, jobId);
+            final ApplicationID applicationId;
+            if (requestBody instanceof JarRunApplicationRequestBody) {
+                applicationId =
+                        ((JarRunApplicationRequestBody) requestBody)
+                                .getApplicationId()
+                                .orElse(null);
+            } else {
+                applicationId = null;
+            }
+
+            return new JarHandlerContext(
+                    jarFile, entryClass, programArgs, parallelism, jobId, 
applicationId);
         }
 
         public void applyToConfiguration(
@@ -217,6 +234,10 @@ public class JarHandlerUtils {
         JobID getJobId() {
             return jobId;
         }
+
+        public Optional<ApplicationID> getApplicationId() {
+            return Optional.ofNullable(applicationId);
+        }
     }
 
     private static List<URL> getClasspaths(Configuration configuration) {
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandlerParameterTest.java
new file mode 100644
index 00000000000..ac8299f58e6
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationHandlerParameterTest.java
@@ -0,0 +1,514 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.client.deployment.application.PackagedProgramApplication;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.PipelineOptions;
+import org.apache.flink.configuration.PipelineOptionsInternal;
+import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.core.execution.RecoveryClaimMode;
+import org.apache.flink.core.testutils.AllCallbackWrapper;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.util.BlobServerExtension;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for the parameter handling of the {@link JarRunApplicationHandler}. 
*/
+class JarRunApplicationHandlerParameterTest {
+    static final String[] PROG_ARGS = new String[] {"--host", "localhost", 
"--port", "1234"};
+    static final int PARALLELISM = 4;
+    static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true;
+    static final String RESTORE_PATH = "/foo/bar";
+    static final RecoveryClaimMode RESTORE_MODE = RecoveryClaimMode.CLAIM;
+
+    @RegisterExtension
+    private static final AllCallbackWrapper<BlobServerExtension> 
blobServerExtension =
+            new AllCallbackWrapper<>(new BlobServerExtension());
+
+    static final AtomicReference<PackagedProgramApplication> 
LAST_SUBMITTED_APPLICATION_REFERENCE =
+            new AtomicReference<>();
+
+    static TestingDispatcherGateway restfulGateway;
+    static Path jarDir;
+    static GatewayRetriever<TestingDispatcherGateway> gatewayRetriever =
+            () -> CompletableFuture.completedFuture(restfulGateway);
+    static CompletableFuture<String> localAddressFuture =
+            CompletableFuture.completedFuture("shazam://localhost:12345");
+    static Duration timeout = Duration.ofSeconds(10);
+    static Map<String, String> responseHeaders = Collections.emptyMap();
+
+    protected static Path jarWithManifest;
+    private static Path jarWithoutManifest;
+
+    private static JarRunApplicationHandler handler;
+
+    private static final Configuration FLINK_CONFIGURATION =
+            new Configuration()
+                    .set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, 
Duration.ofMillis(120000L))
+                    .set(CoreOptions.DEFAULT_PARALLELISM, 57)
+                    .set(StateRecoveryOptions.SAVEPOINT_PATH, "/foo/bar/test")
+                    
.set(StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE, false)
+                    .set(StateRecoveryOptions.RESTORE_MODE, RESTORE_MODE)
+                    .set(
+                            PipelineOptions.PARALLELISM_OVERRIDES,
+                            new HashMap<>() {
+                                {
+                                    put("v1", "10");
+                                }
+                            });
+
+    @BeforeAll
+    static void setup(@TempDir File tempDir) throws Exception {
+        init(tempDir);
+        final GatewayRetriever<TestingDispatcherGateway> gatewayRetriever =
+                () -> CompletableFuture.completedFuture(restfulGateway);
+        final Duration timeout = Duration.ofSeconds(10);
+        final Map<String, String> responseHeaders = Collections.emptyMap();
+
+        handler =
+                new JarRunApplicationHandler(
+                        gatewayRetriever,
+                        timeout,
+                        responseHeaders,
+                        JarRunApplicationHeaders.getInstance(),
+                        jarDir,
+                        new Configuration());
+    }
+
+    static void init(File tmpDir) throws Exception {
+        jarDir = tmpDir.toPath();
+
+        // properties are set property by surefire plugin
+        final String parameterProgramJarName = 
System.getProperty("parameterJarName") + ".jar";
+        final String parameterProgramWithoutManifestJarName =
+                System.getProperty("parameterJarWithoutManifestName") + ".jar";
+        final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+
+        jarWithManifest =
+                Files.copy(
+                        jarLocation.resolve(parameterProgramJarName),
+                        jarDir.resolve("program-with-manifest.jar"));
+        jarWithoutManifest =
+                Files.copy(
+                        
jarLocation.resolve(parameterProgramWithoutManifestJarName),
+                        jarDir.resolve("program-without-manifest.jar"));
+
+        restfulGateway =
+                TestingDispatcherGateway.newBuilder()
+                        .setBlobServerPort(
+                                
blobServerExtension.getCustomExtension().getBlobServerPort())
+                        .setSubmitApplicationFunction(
+                                application -> {
+                                    if (application instanceof 
PackagedProgramApplication) {
+                                        
LAST_SUBMITTED_APPLICATION_REFERENCE.set(
+                                                (PackagedProgramApplication) 
application);
+                                        return 
CompletableFuture.completedFuture(Acknowledge.get());
+                                    }
+                                    return FutureUtils.completedExceptionally(
+                                            new FlinkRuntimeException(
+                                                    "Unsupported application 
type"));
+                                })
+                        .build();
+
+        gatewayRetriever = () -> 
CompletableFuture.completedFuture(restfulGateway);
+        localAddressFuture = 
CompletableFuture.completedFuture("shazam://localhost:12345");
+        timeout = Duration.ofSeconds(10);
+        responseHeaders = Collections.emptyMap();
+    }
+
+    @Test
+    void testDefaultParameters() throws Exception {
+        // baseline, ensure that reasonable defaults are chosen
+        handleRequest(
+                createRequest(
+                        getDefaultJarRequestBody(),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest));
+
+        validateDefaultApplication();
+    }
+
+    @Test
+    void testExceptionThrownWithoutManifestOrEntryClass() throws Exception {
+        final HandlerRequest<JarRunApplicationRequestBody> request =
+                createRequest(
+                        getDefaultJarRequestBody(),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithoutManifest);
+
+        assertThatThrownBy(() -> handler.handleRequest(request, 
restfulGateway).get())
+                .matches(
+                        e -> {
+                            final Throwable throwable =
+                                    
ExceptionUtils.stripCompletionException(e.getCause());
+
+                            final Optional<ProgramInvocationException> 
invocationException =
+                                    ExceptionUtils.findThrowable(
+                                            throwable, 
ProgramInvocationException.class);
+
+                            assertThat(invocationException).isPresent();
+
+                            final String exceptionMsg = 
invocationException.get().getMessage();
+                            assertThat(exceptionMsg)
+                                    .contains(
+                                            "Neither a 'Main-Class', nor a 
'program-class' entry was found in the jar file.");
+
+                            return true;
+                        });
+    }
+
+    @Test
+    void testQueryParameters() throws Exception {
+        handleRequest(
+                createRequest(
+                        getDefaultJarRequestBody(),
+                        getJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithoutManifest));
+
+        validateApplication();
+    }
+
+    @Test
+    void testRequestBody() throws Exception {
+        handleRequest(
+                createRequest(
+                        getJarRequestBody(),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithoutManifest));
+
+        validateApplication();
+    }
+
+    @Test
+    void testConfigurationViaRequestBody() throws Exception {
+        handleRequest(
+                createRequest(
+                        getJarRequestWithConfiguration(),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest));
+
+        validateApplicationWithConfiguration();
+    }
+
+    @Test
+    void testJobId() throws Exception {
+        final JobID jobId = new JobID();
+        handleRequest(
+                createRequest(
+                        getJarRequestBodyWithJobId(jobId),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest));
+
+        final PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+
+        assertThat(
+                        application
+                                .getConfiguration()
+                                
.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID))
+                .isEqualTo(jobId.toHexString());
+    }
+
+    @Test
+    void testApplicationId() throws Exception {
+        final ApplicationID applicationId = new ApplicationID();
+        handleRequest(
+                createRequest(
+                        getJarRequestBodyWithApplicationId(applicationId),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest));
+
+        final PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+
+        assertThat(application.getApplicationId()).isEqualTo(applicationId);
+    }
+
+    @Test
+    void testParameterPrioritization() throws Exception {
+        // parameters via query parameters and JSON request, JSON should be 
prioritized
+        handleRequest(
+                createRequest(
+                        getJarRequestBody(),
+                        getWrongJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithoutManifest));
+
+        validateApplication();
+    }
+
+    @Test
+    void testExceptionNotThrownWithEagerSink() throws Exception {
+        final Path jarLocation = Paths.get(System.getProperty("targetDir"));
+        final String parameterProgramWithEagerSink = 
"parameter-program-with-eager-sink.jar";
+        Path jarWithEagerSink =
+                Files.copy(
+                        jarLocation.resolve(parameterProgramWithEagerSink),
+                        jarDir.resolve("program-with-eager-sink.jar"));
+
+        // the handler do not run the program and should not throw an exception
+        handleRequest(
+                createRequest(
+                        getDefaultJarRequestBody(),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithEagerSink));
+
+        validateDefaultApplication();
+    }
+
+    @Test
+    void testEmptySavepointPath() throws Exception {
+        handleRequest(
+                createRequest(
+                        getJarRequestBodyWithSavepointPath(""),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest));
+
+        final PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+        final SavepointRestoreSettings savepointRestoreSettings =
+                
SavepointRestoreSettings.fromConfiguration(application.getConfiguration());
+
+        
assertThat(savepointRestoreSettings).isEqualTo(SavepointRestoreSettings.none());
+    }
+
+    @Test
+    void testParallelismOverrides() throws Exception {
+        handleRequest(
+                createRequest(
+                        getJarRequestWithConfiguration(),
+                        getUnresolvedJarMessageParameters(),
+                        getUnresolvedJarMessageParameters(),
+                        jarWithManifest));
+
+        final PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+
+        
assertThat(application.getConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES))
+                .containsOnlyKeys("v1")
+                .containsEntry("v1", "10");
+    }
+
+    protected static HandlerRequest<JarRunApplicationRequestBody> 
createRequest(
+            JarRunApplicationRequestBody requestBody,
+            JarRunApplicationMessageParameters parameters,
+            JarRunApplicationMessageParameters unresolvedMessageParameters,
+            Path jar)
+            throws HandlerRequestException {
+
+        final Map<String, List<String>> queryParameterAsMap =
+                parameters.getQueryParameters().stream()
+                        .filter(MessageParameter::isResolved)
+                        .collect(
+                                Collectors.toMap(
+                                        MessageParameter::getKey,
+                                        
JarRunApplicationHandlerParameterTest::getValuesAsString));
+
+        return HandlerRequest.resolveParametersAndCreate(
+                requestBody,
+                unresolvedMessageParameters,
+                Collections.singletonMap(JarIdPathParameter.KEY, 
jar.getFileName().toString()),
+                queryParameterAsMap,
+                Collections.emptyList());
+    }
+
+    private static <X> List<String> getValuesAsString(MessageQueryParameter<X> 
parameter) {
+        final List<X> values = parameter.getValue();
+        return 
values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+    }
+
+    JarRunApplicationMessageParameters getUnresolvedJarMessageParameters() {
+        return handler.getMessageHeaders().getUnresolvedMessageParameters();
+    }
+
+    JarRunApplicationMessageParameters getJarMessageParameters() {
+        final JarRunApplicationMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+        parameters.allowNonRestoredStateQueryParameter.resolve(
+                Collections.singletonList(ALLOW_NON_RESTORED_STATE_QUERY));
+        
parameters.savepointPathQueryParameter.resolve(Collections.singletonList(RESTORE_PATH));
+        parameters.entryClassQueryParameter.resolve(
+                
Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+        
parameters.parallelismQueryParameter.resolve(Collections.singletonList(PARALLELISM));
+        parameters.programArgQueryParameter.resolve(Arrays.asList(PROG_ARGS));
+        return parameters;
+    }
+
+    JarRunApplicationMessageParameters getWrongJarMessageParameters() {
+        List<String> wrongArgs =
+                Arrays.stream(PROG_ARGS).map(a -> a + 
"wrong").collect(Collectors.toList());
+        final JarRunApplicationMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+        
parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
+        
parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
+        parameters.entryClassQueryParameter.resolve(
+                Collections.singletonList("please.dont.run.me"));
+        
parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+        parameters.programArgQueryParameter.resolve(wrongArgs);
+        return parameters;
+    }
+
+    JarRunApplicationRequestBody getDefaultJarRequestBody() {
+        return new JarRunApplicationRequestBody();
+    }
+
+    JarRunApplicationRequestBody getJarRequestBody() {
+        return new JarRunApplicationRequestBody(
+                ParameterProgram.class.getCanonicalName(),
+                Arrays.asList(PROG_ARGS),
+                PARALLELISM,
+                null,
+                ALLOW_NON_RESTORED_STATE_QUERY,
+                RESTORE_PATH,
+                RESTORE_MODE,
+                FLINK_CONFIGURATION.toMap(),
+                null);
+    }
+
+    private JarRunApplicationRequestBody 
getJarRequestBodyWithSavepointPath(String savepointPath) {
+        return new JarRunApplicationRequestBody(
+                ParameterProgram.class.getCanonicalName(),
+                Arrays.asList(PROG_ARGS),
+                PARALLELISM,
+                null,
+                ALLOW_NON_RESTORED_STATE_QUERY,
+                savepointPath,
+                RESTORE_MODE,
+                null,
+                null);
+    }
+
+    JarRunApplicationRequestBody getJarRequestBodyWithJobId(JobID jobId) {
+        return new JarRunApplicationRequestBody(
+                null, null, null, jobId, null, null, null, null, null);
+    }
+
+    JarRunApplicationRequestBody 
getJarRequestBodyWithApplicationId(ApplicationID applicationId) {
+        return new JarRunApplicationRequestBody(
+                null, null, null, null, null, null, null, null, applicationId);
+    }
+
+    JarRunApplicationRequestBody getJarRequestWithConfiguration() {
+        return new JarRunApplicationRequestBody(
+                null, null, null, null, null, null, null, 
FLINK_CONFIGURATION.toMap(), null);
+    }
+
+    void handleRequest(HandlerRequest<JarRunApplicationRequestBody> request) 
throws Exception {
+        handler.handleRequest(request, restfulGateway).get();
+    }
+
+    void validateDefaultApplication() {
+        PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+
+        assertThat(application.getPackagedProgram().getArguments()).isEmpty();
+
+        final Configuration configuration = application.getConfiguration();
+        assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM))
+                
.isEqualTo(CoreOptions.DEFAULT_PARALLELISM.defaultValue().intValue());
+
+        final SavepointRestoreSettings savepointRestoreSettings =
+                SavepointRestoreSettings.fromConfiguration(configuration);
+        assertThat(savepointRestoreSettings.allowNonRestoredState()).isFalse();
+        assertThat(savepointRestoreSettings.getRestorePath()).isNull();
+    }
+
+    void validateApplication() {
+        PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+
+        
assertThat(application.getPackagedProgram().getArguments()).isEqualTo(PROG_ARGS);
+
+        final Configuration configuration = application.getConfiguration();
+        
assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM)).isEqualTo(PARALLELISM);
+
+        final SavepointRestoreSettings savepointRestoreSettings =
+                SavepointRestoreSettings.fromConfiguration(configuration);
+        assertThat(savepointRestoreSettings.allowNonRestoredState())
+                .isEqualTo(ALLOW_NON_RESTORED_STATE_QUERY);
+        
assertThat(savepointRestoreSettings.getRestorePath()).isEqualTo(RESTORE_PATH);
+    }
+
+    void validateApplicationWithConfiguration() {
+        PackagedProgramApplication application =
+                LAST_SUBMITTED_APPLICATION_REFERENCE.getAndSet(null);
+        final Configuration configuration = application.getConfiguration();
+
+        assertThat(configuration.get(CoreOptions.DEFAULT_PARALLELISM))
+                
.isEqualTo(FLINK_CONFIGURATION.get(CoreOptions.DEFAULT_PARALLELISM));
+        
assertThat(configuration.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT))
+                
.isEqualTo(FLINK_CONFIGURATION.get(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT));
+
+        final SavepointRestoreSettings savepointRestoreSettings =
+                SavepointRestoreSettings.fromConfiguration(configuration);
+        assertThat(savepointRestoreSettings.getRecoveryClaimMode())
+                
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.RESTORE_MODE));
+        assertThat(savepointRestoreSettings.getRestorePath())
+                
.isEqualTo(FLINK_CONFIGURATION.get(StateRecoveryOptions.SAVEPOINT_PATH));
+        assertThat(savepointRestoreSettings.allowNonRestoredState())
+                .isEqualTo(
+                        FLINK_CONFIGURATION.get(
+                                
StateRecoveryOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE));
+    }
+}
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBodyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBodyTest.java
new file mode 100644
index 00000000000..49cce916178
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationRequestBodyTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.core.execution.RecoveryClaimMode;
+import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.util.Arrays;
+import java.util.Collections;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link JarRunApplicationRequestBody}. */
+@ExtendWith(NoOpTestExtension.class)
+class JarRunApplicationRequestBodyTest
+        extends RestRequestMarshallingTestBase<JarRunApplicationRequestBody> {
+
+    @Override
+    protected Class<JarRunApplicationRequestBody> getTestRequestClass() {
+        return JarRunApplicationRequestBody.class;
+    }
+
+    @Override
+    protected JarRunApplicationRequestBody getTestRequestInstance() {
+        return new JarRunApplicationRequestBody(
+                "hello",
+                Arrays.asList("boo", "far"),
+                4,
+                new JobID(),
+                true,
+                "foo/bar",
+                RecoveryClaimMode.CLAIM,
+                Collections.singletonMap("key", "value"),
+                new ApplicationID());
+    }
+
+    @Override
+    protected void assertOriginalEqualsToUnmarshalled(
+            final JarRunApplicationRequestBody expected,
+            final JarRunApplicationRequestBody actual) {
+        
assertThat(actual.getEntryClassName()).isEqualTo(expected.getEntryClassName());
+        
assertThat(actual.getProgramArgumentsList()).isEqualTo(expected.getProgramArgumentsList());
+        
assertThat(actual.getParallelism()).isEqualTo(expected.getParallelism());
+        assertThat(actual.getJobId()).isEqualTo(expected.getJobId());
+        assertThat(actual.getAllowNonRestoredState())
+                .isEqualTo(expected.getAllowNonRestoredState());
+        
assertThat(actual.getSavepointPath()).isEqualTo(expected.getSavepointPath());
+        
assertThat(actual.getRecoveryClaimMode()).isEqualTo(expected.getRecoveryClaimMode());
+        assertThat(actual.getFlinkConfiguration().toMap())
+                
.containsExactlyEntriesOf(expected.getFlinkConfiguration().toMap());
+    }
+}
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBodyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBodyTest.java
new file mode 100644
index 00000000000..673a2b91f41
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunApplicationResponseBodyTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ApplicationID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+import 
org.apache.flink.testutils.junit.extensions.parameterized.NoOpTestExtension;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link JarRunApplicationResponseBody}. */
+@ExtendWith(NoOpTestExtension.class)
+public class JarRunApplicationResponseBodyTest
+        extends RestResponseMarshallingTestBase<JarRunApplicationResponseBody> 
{
+
+    @Override
+    protected Class<JarRunApplicationResponseBody> getTestResponseClass() {
+        return JarRunApplicationResponseBody.class;
+    }
+
+    @Override
+    protected JarRunApplicationResponseBody getTestResponseInstance() {
+        return new JarRunApplicationResponseBody(new ApplicationID());
+    }
+
+    @Override
+    protected void assertOriginalEqualsToUnmarshalled(
+            final JarRunApplicationResponseBody expected,
+            final JarRunApplicationResponseBody actual) {
+        
assertThat(actual.getApplicationId()).isEqualTo(expected.getApplicationId());
+    }
+}
diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot 
b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
index 406ebe85ef0..3b37c3eb6f0 100644
--- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
+++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot
@@ -768,6 +768,83 @@
         }
       }
     }
+  }, {
+    "url" : "/jars/:jarid/run-application",
+    "method" : "POST",
+    "status-code" : "202 Accepted",
+    "file-upload" : false,
+    "path-parameters" : {
+      "pathParameters" : [ {
+        "key" : "jarid"
+      } ]
+    },
+    "query-parameters" : {
+      "queryParameters" : [ {
+        "key" : "allowNonRestoredState",
+        "mandatory" : false
+      }, {
+        "key" : "savepointPath",
+        "mandatory" : false
+      }, {
+        "key" : "programArg",
+        "mandatory" : false
+      }, {
+        "key" : "entry-class",
+        "mandatory" : false
+      }, {
+        "key" : "parallelism",
+        "mandatory" : false
+      } ]
+    },
+    "request" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunApplicationRequestBody",
+      "properties" : {
+        "entryClass" : {
+          "type" : "string"
+        },
+        "programArgsList" : {
+          "type" : "array",
+          "items" : {
+            "type" : "string"
+          }
+        },
+        "parallelism" : {
+          "type" : "integer"
+        },
+        "jobId" : {
+          "type" : "any"
+        },
+        "allowNonRestoredState" : {
+          "type" : "boolean"
+        },
+        "savepointPath" : {
+          "type" : "string"
+        },
+        "claimMode" : {
+          "type" : "string",
+          "enum" : [ "CLAIM", "NO_CLAIM", "LEGACY" ]
+        },
+        "flinkConfiguration" : {
+          "type" : "object",
+          "additionalProperties" : {
+            "type" : "string"
+          }
+        },
+        "applicationId" : {
+          "type" : "any"
+        }
+      }
+    },
+    "response" : {
+      "type" : "object",
+      "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarRunApplicationResponseBody",
+      "properties" : {
+        "applicationid" : {
+          "type" : "any"
+        }
+      }
+    }
   }, {
     "url" : "/jobmanager/config",
     "method" : "GET",

Reply via email to