[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16643527#comment-16643527
 ] 

ASF GitHub Bot commented on FLINK-10295:
----------------------------------------

zentol closed pull request #6754: [FLINK-10295] Add support of passing jar 
arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/_includes/generated/rest_v1_dispatcher.html 
b/docs/_includes/generated/rest_v1_dispatcher.html
index 6ee705eeb0a..4977e5ccfeb 100644
--- a/docs/_includes/generated/rest_v1_dispatcher.html
+++ b/docs/_includes/generated/rest_v1_dispatcher.html
@@ -272,7 +272,7 @@
       <td class="text-left">Response code: <code>200 OK</code></td>
     </tr>
     <tr>
-      <td colspan="2">Returns the dataflow plan of a job contained in a jar 
previously uploaded via '/jars/upload'.</td>
+      <td colspan="2">Returns the dataflow plan of a job contained in a jar 
previously uploaded via '/jars/upload'. Program arguments can be passed both 
via the JSON request (recommended) or query parameters.</td>
     </tr>
     <tr>
       <td colspan="2">Path parameters</td>
@@ -290,19 +290,40 @@
     <tr>
       <td colspan="2">
         <ul>
+<li><code>program-args</code> (optional): Deprecated, please use 'programArg' 
instead. String value that specifies the arguments for the program or plan</li>
+<li><code>programArg</code> (optional): Comma-separated list of program 
arguments.</li>
 <li><code>entry-class</code> (optional): String value that specifies the fully 
qualified name of the entry point class. Overrides the class defined in the jar 
file manifest.</li>
 <li><code>parallelism</code> (optional): Positive integer value that specifies 
the desired parallelism for the job.</li>
-<li><code>program-args</code> (optional): String value that specifies the 
arguments for the program or plan.</li>
         </ul>
       </td>
     </tr>
     <tr>
       <td colspan="2">
-        <button data-toggle="collapse" 
data-target="#-181694384">Request</button>
-        <div id="-181694384" class="collapse">
+        <button data-toggle="collapse" 
data-target="#550027726">Request</button>
+        <div id="550027726" class="collapse">
           <pre>
             <code>
-{}            </code>
+{
+  "type" : "object",
+  "id" : 
"urn:jsonschema:org:apache:flink:runtime:webmonitor:handlers:JarPlanRequestBody",
+  "properties" : {
+    "entryClass" : {
+      "type" : "string"
+    },
+    "programArgs" : {
+      "type" : "string"
+    },
+    "programArgsList" : {
+      "type" : "array",
+      "items" : {
+        "type" : "string"
+      }
+    },
+    "parallelism" : {
+      "type" : "integer"
+    }
+  }
+}            </code>
           </pre>
          </div>
       </td>
@@ -338,7 +359,7 @@
       <td class="text-left">Response code: <code>200 OK</code></td>
     </tr>
     <tr>
-      <td colspan="2">Submits a job by running a jar previously uploaded via 
'/jars/upload'.</td>
+      <td colspan="2">Submits a job by running a jar previously uploaded via 
'/jars/upload'. Program arguments can be passed both via the JSON request 
(recommended) or query parameters.</td>
     </tr>
     <tr>
       <td colspan="2">Path parameters</td>
@@ -356,11 +377,12 @@
     <tr>
       <td colspan="2">
         <ul>
-<li><code>program-args</code> (optional): String value that specifies the 
arguments for the program or plan.</li>
-<li><code>entry-class</code> (optional): String value that specifies the fully 
qualified name of the entry point class. Overrides the class defined in the jar 
file manifest.</li>
-<li><code>parallelism</code> (optional): Positive integer value that specifies 
the desired parallelism for the job.</li>
 <li><code>allowNonRestoredState</code> (optional): Boolean value that 
specifies whether the job submission should be rejected if the savepoint 
contains state that cannot be mapped back to the job.</li>
 <li><code>savepointPath</code> (optional): String value that specifies the 
path of the savepoint to restore the job from.</li>
+<li><code>program-args</code> (optional): Deprecated, please use 'programArg' 
instead. String value that specifies the arguments for the program or plan</li>
+<li><code>programArg</code> (optional): Comma-separated list of program 
arguments.</li>
+<li><code>entry-class</code> (optional): String value that specifies the fully 
qualified name of the entry point class. Overrides the class defined in the jar 
file manifest.</li>
+<li><code>parallelism</code> (optional): Positive integer value that specifies 
the desired parallelism for the job.</li>
         </ul>
       </td>
     </tr>
@@ -380,6 +402,12 @@
     "programArgs" : {
       "type" : "string"
     },
+    "programArgsList" : {
+      "type" : "array",
+      "items" : {
+        "type" : "string"
+      }
+    },
     "parallelism" : {
       "type" : "integer"
     },
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
new file mode 100644
index 00000000000..f1bf00a83f4
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base class of {@link MessageParameters} for {@link JarRunHandler} and 
{@link JarPlanHandler}.
+ */
+abstract class JarMessageParameters extends MessageParameters {
+
+       final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+
+       final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
+
+       final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
+
+       final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
+
+       final ProgramArgQueryParameter programArgQueryParameter = new 
ProgramArgQueryParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.singletonList(jarIdPathParameter);
+       }
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return Collections.unmodifiableList(Arrays.asList(
+                       programArgsQueryParameter,
+                       programArgQueryParameter,
+                       entryClassQueryParameter,
+                       parallelismQueryParameter));
+       }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 2f0631610fc..94a53643af1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -18,44 +18,34 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
-import org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
 import javax.annotation.Nonnull;
 
 import java.nio.file.Path;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
+import java.util.function.Function;
 
 import static java.util.Objects.requireNonNull;
-import static 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.tokenizeArguments;
-import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
 
 /**
  * This handler handles requests to fetch the plan for a jar.
  */
 public class JarPlanHandler
-               extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, 
JobPlanInfo, JarPlanMessageParameters> {
+               extends AbstractRestHandler<RestfulGateway, JarPlanRequestBody, 
JobPlanInfo, JarPlanMessageParameters> {
 
        private final Path jarDir;
 
@@ -63,47 +53,49 @@
 
        private final Executor executor;
 
+       private final Function<JobGraph, JobPlanInfo> planGenerator;
+
        public JarPlanHandler(
                        final CompletableFuture<String> localRestAddress,
                        final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        final Time timeout,
                        final Map<String, String> responseHeaders,
-                       final MessageHeaders<EmptyRequestBody, JobPlanInfo, 
JarPlanMessageParameters> messageHeaders,
+                       final MessageHeaders<JarPlanRequestBody, JobPlanInfo, 
JarPlanMessageParameters> messageHeaders,
                        final Path jarDir,
                        final Configuration configuration,
                        final Executor executor) {
+               this(
+                       localRestAddress, leaderRetriever, timeout, 
responseHeaders,
+                       messageHeaders, jarDir, configuration, executor,
+                       jobGraph -> new 
JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph)));
+       }
+
+       public JarPlanHandler(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       final Time timeout,
+                       final Map<String, String> responseHeaders,
+                       final MessageHeaders<JarPlanRequestBody, JobPlanInfo, 
JarPlanMessageParameters> messageHeaders,
+                       final Path jarDir,
+                       final Configuration configuration,
+                       final Executor executor,
+                       final Function<JobGraph, JobPlanInfo> planGenerator) {
                super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
                this.jarDir = requireNonNull(jarDir);
                this.configuration = requireNonNull(configuration);
                this.executor = requireNonNull(executor);
+               this.planGenerator = planGenerator;
        }
 
        @Override
        protected CompletableFuture<JobPlanInfo> handleRequest(
-                       @Nonnull final HandlerRequest<EmptyRequestBody, 
JarPlanMessageParameters> request,
+                       @Nonnull final HandlerRequest<JarPlanRequestBody, 
JarPlanMessageParameters> request,
                        @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
-
-               final String jarId = 
request.getPathParameter(JarIdPathParameter.class);
-               final String entryClass = 
emptyToNull(HandlerRequestUtils.getQueryParameter(request, 
EntryClassQueryParameter.class));
-               final Integer parallelism = 
HandlerRequestUtils.getQueryParameter(request, ParallelismQueryParameter.class, 
ExecutionConfig.PARALLELISM_DEFAULT);
-               final List<String> programArgs = 
tokenizeArguments(HandlerRequestUtils.getQueryParameter(request, 
ProgramArgsQueryParameter.class));
-               final Path jarFile = jarDir.resolve(jarId);
+               final JarHandlerContext context = 
JarHandlerContext.fromRequest(request, jarDir, log);
 
                return CompletableFuture.supplyAsync(() -> {
-                       final JobGraph jobGraph;
-                       try {
-                               final PackagedProgram packagedProgram = new 
PackagedProgram(
-                                       jarFile.toFile(),
-                                       entryClass,
-                                       programArgs.toArray(new 
String[programArgs.size()]));
-                               jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
parallelism);
-                       } catch (final ProgramInvocationException e) {
-                               throw new CompletionException(new 
RestHandlerException(
-                                       e.getMessage(),
-                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
-                                       e));
-                       }
-                       return new 
JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph));
+                       final JobGraph jobGraph = 
context.toJobGraph(configuration);
+                       return planGenerator.apply(jobGraph);
                }, executor);
        }
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
index 38fc705e84b..09311d8aca0 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHeaders.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.HttpMethodWrapper;
-import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 
@@ -28,7 +27,7 @@
 /**
  * Message headers for {@link JarPlanHandler}.
  */
-public class JarPlanHeaders implements MessageHeaders<EmptyRequestBody, 
JobPlanInfo, JarPlanMessageParameters> {
+public class JarPlanHeaders implements MessageHeaders<JarPlanRequestBody, 
JobPlanInfo, JarPlanMessageParameters> {
 
        private static final JarPlanHeaders INSTANCE = new JarPlanHeaders();
 
@@ -43,8 +42,8 @@ public HttpResponseStatus getResponseStatusCode() {
        }
 
        @Override
-       public Class<EmptyRequestBody> getRequestClass() {
-               return EmptyRequestBody.class;
+       public Class<JarPlanRequestBody> getRequestClass() {
+               return JarPlanRequestBody.class;
        }
 
        @Override
@@ -68,6 +67,7 @@ public static JarPlanHeaders getInstance() {
 
        @Override
        public String getDescription() {
-               return "Returns the dataflow plan of a job contained in a jar 
previously uploaded via '" + JarUploadHeaders.URL + "'.";
+               return "Returns the dataflow plan of a job contained in a jar 
previously uploaded via '" + JarUploadHeaders.URL + "'. " +
+                       "Program arguments can be passed both via the JSON 
request (recommended) or query parameters.";
        }
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
index 8599a2ccf44..e7619b36361 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanMessageParameters.java
@@ -19,36 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.MessagePathParameter;
-import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 
 /**
- * Message parameters for {@link JarPlanHandler}.
+ * {@link MessageParameters} for {@link JarPlanHandler}.
  */
-public class JarPlanMessageParameters extends MessageParameters {
-
-       public final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
-
-       private final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
-
-       private final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
-
-       private final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
-
-       @Override
-       public Collection<MessagePathParameter<?>> getPathParameters() {
-               return Collections.singletonList(jarIdPathParameter);
-       }
-
-       @Override
-       public Collection<MessageQueryParameter<?>> getQueryParameters() {
-               return Collections.unmodifiableCollection(Arrays.asList(
-                       entryClassQueryParameter,
-                       parallelismQueryParameter,
-                       programArgsQueryParameter));
-       }
+class JarPlanMessageParameters extends JarMessageParameters {
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java
new file mode 100644
index 00000000000..8e209b41cc1
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanRequestBody.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * {@link RequestBody} for querying the plan from a jar.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarPlanRequestBody extends JarRequestBody {
+       JarPlanRequestBody() {
+               super(null, null, null, null);
+       }
+
+       @JsonCreator
+       JarPlanRequestBody(
+               @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
+               @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String 
programArguments,
+               @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) 
List<String> programArgumentsList,
+               @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism) {
+               super(entryClassName, programArguments, programArgumentsList, 
parallelism);
+       }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
new file mode 100644
index 00000000000..144ac05058c
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Base class for {@link RequestBody} for running a jar or querying the plan.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class JarRequestBody implements RequestBody {
+
+       static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+       static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs";
+       static final String FIELD_NAME_PROGRAM_ARGUMENTS_LIST = 
"programArgsList";
+       static final String FIELD_NAME_PARALLELISM = "parallelism";
+
+       @JsonProperty(FIELD_NAME_ENTRY_CLASS)
+       @Nullable
+       private String entryClassName;
+
+       @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+       @Nullable
+       private String programArguments;
+
+       @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST)
+       @Nullable
+       private List<String> programArgumentsList;
+
+       @JsonProperty(FIELD_NAME_PARALLELISM)
+       @Nullable
+       private Integer parallelism;
+
+       JarRequestBody() {
+               this(null, null, null, null);
+       }
+
+       @JsonCreator
+       JarRequestBody(
+               @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
+               @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String 
programArguments,
+               @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) 
List<String> programArgumentsList,
+               @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism) {
+               this.entryClassName = entryClassName;
+               this.programArguments = programArguments;
+               this.programArgumentsList = programArgumentsList;
+               this.parallelism = parallelism;
+       }
+
+       @Nullable
+       @JsonIgnore
+       public String getEntryClassName() {
+               return entryClassName;
+       }
+
+       @Nullable
+       @JsonIgnore
+       public String getProgramArguments() {
+               return programArguments;
+       }
+
+       @Nullable
+       @JsonIgnore
+       public List<String> getProgramArgumentsList() {
+               return programArgumentsList;
+       }
+
+       @Nullable
+       @JsonIgnore
+       public Integer getParallelism() {
+               return parallelism;
+       }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index bae4ba86a0e..679dfe988ef 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -18,11 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.program.PackagedProgram;
-import org.apache.flink.client.program.PackagedProgramUtils;
-import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobClient;
 import org.apache.flink.runtime.client.ClientUtils;
@@ -34,29 +30,24 @@
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.function.SupplierWithException;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import org.slf4j.Logger;
-
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.net.InetSocketAddress;
-import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
 import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
-import static 
org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.tokenizeArguments;
 import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
 
 /**
@@ -91,39 +82,11 @@ public JarRunHandler(
        protected CompletableFuture<JarRunResponseBody> handleRequest(
                        @Nonnull final HandlerRequest<JarRunRequestBody, 
JarRunMessageParameters> request,
                        @Nonnull final DispatcherGateway gateway) throws 
RestHandlerException {
-
-               final JarRunRequestBody requestBody = request.getRequestBody();
-
-               final String pathParameter = 
request.getPathParameter(JarIdPathParameter.class);
-               final Path jarFile = jarDir.resolve(pathParameter);
-
-               final String entryClass = fromRequestBodyOrQueryParameter(
-                       emptyToNull(requestBody.getEntryClassName()),
-                       () -> emptyToNull(getQueryParameter(request, 
EntryClassQueryParameter.class)),
-                       null,
-                       log);
-
-               final List<String> programArgs = tokenizeArguments(
-                       fromRequestBodyOrQueryParameter(
-                               emptyToNull(requestBody.getProgramArguments()),
-                               () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
-                               null,
-                               log));
-
-               final int parallelism = fromRequestBodyOrQueryParameter(
-                       requestBody.getParallelism(),
-                       () -> getQueryParameter(request, 
ParallelismQueryParameter.class),
-                       ExecutionConfig.PARALLELISM_DEFAULT,
-                       log);
+               final JarHandlerContext context = 
JarHandlerContext.fromRequest(request, jarDir, log);
 
                final SavepointRestoreSettings savepointRestoreSettings = 
getSavepointRestoreSettings(request);
 
-               final CompletableFuture<JobGraph> jobGraphFuture = 
getJobGraphAsync(
-                       jarFile,
-                       entryClass,
-                       programArgs,
-                       savepointRestoreSettings,
-                       parallelism);
+               final CompletableFuture<JobGraph> jobGraphFuture = 
getJobGraphAsync(context, savepointRestoreSettings);
 
                CompletableFuture<Integer> blobServerPortFuture = 
gateway.getBlobServerPort(timeout);
 
@@ -181,52 +144,11 @@ private SavepointRestoreSettings 
getSavepointRestoreSettings(
                return savepointRestoreSettings;
        }
 
-       /**
-        * Returns {@code requestValue} if it is not null, otherwise returns 
the query parameter value
-        * if it is not null, otherwise returns the default value.
-        */
-       private static <T> T fromRequestBodyOrQueryParameter(
-                       T requestValue,
-                       SupplierWithException<T, RestHandlerException> 
queryParameterExtractor,
-                       T defaultValue,
-                       Logger log) throws RestHandlerException {
-               if (requestValue != null) {
-                       return requestValue;
-               } else {
-                       T queryParameterValue = queryParameterExtractor.get();
-                       if (queryParameterValue != null) {
-                               log.warn("Configuring the job submission via 
query parameters is deprecated." +
-                                       " Please migrate to submitting a JSON 
request instead.");
-                               return queryParameterValue;
-                       } else {
-                               return defaultValue;
-                       }
-               }
-       }
-
        private CompletableFuture<JobGraph> getJobGraphAsync(
-                       final Path jarFile,
-                       @Nullable final String entryClass,
-                       final List<String> programArgs,
-                       final SavepointRestoreSettings savepointRestoreSettings,
-                       final int parallelism) {
-
+                       JarHandlerContext context,
+                       final SavepointRestoreSettings 
savepointRestoreSettings) {
                return CompletableFuture.supplyAsync(() -> {
-                       if (!Files.exists(jarFile)) {
-                               throw new CompletionException(new 
RestHandlerException(
-                                       String.format("Jar file %s does not 
exist", jarFile), HttpResponseStatus.BAD_REQUEST));
-                       }
-
-                       final JobGraph jobGraph;
-                       try {
-                               final PackagedProgram packagedProgram = new 
PackagedProgram(
-                                       jarFile.toFile(),
-                                       entryClass,
-                                       programArgs.toArray(new 
String[programArgs.size()]));
-                               jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
parallelism);
-                       } catch (final ProgramInvocationException e) {
-                               throw new CompletionException(e);
-                       }
+                       final JobGraph jobGraph = 
context.toJobGraph(configuration);
                        
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
                        return jobGraph;
                }, executor);
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
index fa062e906bc..6085528d0ce 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -68,6 +68,7 @@ public static JarRunHeaders getInstance() {
 
        @Override
        public String getDescription() {
-               return "Submits a job by running a jar previously uploaded via 
'" + JarUploadHeaders.URL + "'.";
+               return "Submits a job by running a jar previously uploaded via 
'" + JarUploadHeaders.URL + "'. " +
+                       "Program arguments can be passed both via the JSON 
request (recommended) or query parameters.";
        }
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
index 78267db22a0..ed69b81017f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
@@ -19,9 +19,9 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.rest.messages.MessageParameters;
-import org.apache.flink.runtime.rest.messages.MessagePathParameter;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -29,32 +29,18 @@
 /**
  * {@link MessageParameters} for {@link JarRunHandler}.
  */
-public class JarRunMessageParameters extends MessageParameters {
+public class JarRunMessageParameters extends JarMessageParameters {
 
-       public final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
+       final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
 
-       public final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
-
-       public final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
-
-       public final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
-
-       public final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
-
-       public final SavepointPathQueryParameter savepointPathQueryParameter = 
new SavepointPathQueryParameter();
-
-       @Override
-       public Collection<MessagePathParameter<?>> getPathParameters() {
-               return Collections.singletonList(jarIdPathParameter);
-       }
+       final SavepointPathQueryParameter savepointPathQueryParameter = new 
SavepointPathQueryParameter();
 
        @Override
        public Collection<MessageQueryParameter<?>> getQueryParameters() {
-               return Collections.unmodifiableCollection(Arrays.asList(
-                       programArgsQueryParameter,
-                       entryClassQueryParameter,
-                       parallelismQueryParameter,
+               Collection<MessageQueryParameter<?>> pars = new 
ArrayList<>(Arrays.asList(
                        allowNonRestoredStateQueryParameter,
                        savepointPathQueryParameter));
+               pars.addAll(super.getQueryParameters());
+               return Collections.unmodifiableCollection(pars);
        }
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
index b30ae6113ef..9e4ee0f8c2a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
@@ -27,30 +27,16 @@
 
 import javax.annotation.Nullable;
 
+import java.util.List;
+
 /**
  * {@link RequestBody} for running a jar.
  */
 @JsonInclude(JsonInclude.Include.NON_NULL)
-public class JarRunRequestBody implements RequestBody {
-
-       private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
-       private static final String FIELD_NAME_PROGRAM_ARGUMENTS = 
"programArgs";
-       private static final String FIELD_NAME_PARALLELISM = "parallelism";
+public class JarRunRequestBody extends JarRequestBody {
        private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = 
"allowNonRestoredState";
        private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
 
-       @JsonProperty(FIELD_NAME_ENTRY_CLASS)
-       @Nullable
-       private String entryClassName;
-
-       @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
-       @Nullable
-       private String programArguments;
-
-       @JsonProperty(FIELD_NAME_PARALLELISM)
-       @Nullable
-       private Integer parallelism;
-
        @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
        @Nullable
        private Boolean allowNonRestoredState;
@@ -60,41 +46,22 @@
        private String savepointPath;
 
        public JarRunRequestBody() {
-               this(null, null, null, null, null);
+               this(null, null, null, null, null, null);
        }
 
        @JsonCreator
        public JarRunRequestBody(
                        @Nullable @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
                        @Nullable @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) 
String programArguments,
+                       @Nullable 
@JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS_LIST) List<String> 
programArgumentsList,
                        @Nullable @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism,
                        @Nullable 
@JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean 
allowNonRestoredState,
                        @Nullable @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) 
String savepointPath) {
-               this.entryClassName = entryClassName;
-               this.programArguments = programArguments;
-               this.parallelism = parallelism;
+               super(entryClassName, programArguments, programArgumentsList, 
parallelism);
                this.allowNonRestoredState = allowNonRestoredState;
                this.savepointPath = savepointPath;
        }
 
-       @Nullable
-       @JsonIgnore
-       public String getEntryClassName() {
-               return entryClassName;
-       }
-
-       @Nullable
-       @JsonIgnore
-       public String getProgramArguments() {
-               return programArguments;
-       }
-
-       @Nullable
-       @JsonIgnore
-       public Integer getParallelism() {
-               return parallelism;
-       }
-
        @Nullable
        @JsonIgnore
        public Boolean getAllowNonRestoredState() {
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgQueryParameter.java
new file mode 100644
index 00000000000..769da5b7a4f
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgQueryParameter.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying one or more arguments for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class ProgramArgQueryParameter extends StringQueryParameter {
+       static final String PROGRAM_ARG_PARAMETER_NAME = "programArg";
+
+       public ProgramArgQueryParameter() {
+               super(PROGRAM_ARG_PARAMETER_NAME, 
MessageParameter.MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public String getDescription() {
+               return "Comma-separated list of program arguments.";
+       }
+}
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
index d4b0a4fdd23..2cb77e59417 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
@@ -22,8 +22,10 @@
 
 /**
  * Query parameter specifying the arguments for the program.
+ * @deprecated please, use {@link 
JarRequestBody#FIELD_NAME_PROGRAM_ARGUMENTS_LIST}
  * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
  */
+@Deprecated
 public class ProgramArgsQueryParameter extends StringQueryParameter {
 
        public ProgramArgsQueryParameter() {
@@ -32,6 +34,8 @@ public ProgramArgsQueryParameter() {
 
        @Override
        public String getDescription() {
-               return "String value that specifies the arguments for the 
program or plan.";
+               return String.format("Deprecated, please use '%s' instead. " +
+                       "String value that specifies the arguments for the 
program or plan",
+                       ProgramArgQueryParameter.PROGRAM_ARG_PARAMETER_NAME);
        }
 }
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
index 3b1b7449303..9026cf00c5e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
@@ -18,14 +18,43 @@
 
 package org.apache.flink.runtime.webmonitor.handlers.utils;
 
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.webmonitor.handlers.EntryClassQueryParameter;
+import org.apache.flink.runtime.webmonitor.handlers.JarIdPathParameter;
+import org.apache.flink.runtime.webmonitor.handlers.JarRequestBody;
+import org.apache.flink.runtime.webmonitor.handlers.ParallelismQueryParameter;
+import org.apache.flink.runtime.webmonitor.handlers.ProgramArgQueryParameter;
+import org.apache.flink.runtime.webmonitor.handlers.ProgramArgsQueryParameter;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.slf4j.Logger;
+
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.nio.file.Files;
+import java.nio.file.Path;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CompletionException;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
@@ -34,6 +63,93 @@
  */
 public class JarHandlerUtils {
 
+       /** Standard jar handler parameters parsed from request. */
+       public static class JarHandlerContext {
+               private final Path jarFile;
+               private final String entryClass;
+               private final List<String> programArgs;
+               private final int parallelism;
+
+               private JarHandlerContext(Path jarFile, String entryClass, 
List<String> programArgs, int parallelism) {
+                       this.jarFile = jarFile;
+                       this.entryClass = entryClass;
+                       this.programArgs = programArgs;
+                       this.parallelism = parallelism;
+               }
+
+               public static <R extends JarRequestBody> JarHandlerContext 
fromRequest(
+                               @Nonnull final HandlerRequest<R, ?> request,
+                               @Nonnull final Path jarDir,
+                               @Nonnull final Logger log) throws 
RestHandlerException {
+                       final JarRequestBody requestBody = 
request.getRequestBody();
+
+                       final String pathParameter = 
request.getPathParameter(JarIdPathParameter.class);
+                       Path jarFile = jarDir.resolve(pathParameter);
+
+                       String entryClass = fromRequestBodyOrQueryParameter(
+                               emptyToNull(requestBody.getEntryClassName()),
+                               () -> emptyToNull(getQueryParameter(request, 
EntryClassQueryParameter.class)),
+                               null,
+                               log);
+
+                       List<String> programArgs = 
JarHandlerUtils.getProgramArgs(request, log);
+
+                       int parallelism = fromRequestBodyOrQueryParameter(
+                               requestBody.getParallelism(),
+                               () -> getQueryParameter(request, 
ParallelismQueryParameter.class),
+                               ExecutionConfig.PARALLELISM_DEFAULT,
+                               log);
+
+                       return new JarHandlerContext(jarFile, entryClass, 
programArgs, parallelism);
+               }
+
+               public JobGraph toJobGraph(Configuration configuration) {
+                       if (!Files.exists(jarFile)) {
+                               throw new CompletionException(new 
RestHandlerException(
+                                       String.format("Jar file %s does not 
exist", jarFile), HttpResponseStatus.BAD_REQUEST));
+                       }
+
+                       try {
+                               final PackagedProgram packagedProgram = new 
PackagedProgram(
+                                       jarFile.toFile(),
+                                       entryClass,
+                                       programArgs.toArray(new String[0]));
+                               return 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
parallelism);
+                       } catch (final ProgramInvocationException e) {
+                               throw new CompletionException(e);
+                       }
+               }
+       }
+
+       /** Parse program arguments in jar run or plan request. */
+       private static <R extends JarRequestBody, M extends MessageParameters> 
List<String> getProgramArgs(
+                       HandlerRequest<R, M> request, Logger log) throws 
RestHandlerException {
+               JarRequestBody requestBody = request.getRequestBody();
+               @SuppressWarnings("deprecation")
+               List<String> programArgs = tokenizeArguments(
+                       fromRequestBodyOrQueryParameter(
+                               emptyToNull(requestBody.getProgramArguments()),
+                               () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+                               null,
+                               log));
+               List<String> programArgsList =
+                       fromRequestBodyOrQueryParameter(
+                               requestBody.getProgramArgumentsList(),
+                               () -> 
request.getQueryParameter(ProgramArgQueryParameter.class),
+                               null,
+                               log);
+               if (!programArgsList.isEmpty()) {
+                       if (!programArgs.isEmpty()) {
+                               throw new RestHandlerException(
+                                       "Confusing request: programArgs and 
programArgsList are specified, please, use only programArgsList",
+                                       HttpResponseStatus.BAD_REQUEST);
+                       }
+                       return programArgsList;
+               } else {
+                       return programArgs;
+               }
+       }
+
        private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
 
        /**
@@ -48,7 +164,8 @@
         *
         * <strong>WARNING: </strong>This method does not respect escaped 
quotes.
         */
-       public static List<String> tokenizeArguments(@Nullable final String 
args) {
+       @VisibleForTesting
+       static List<String> tokenizeArguments(@Nullable final String args) {
                if (args == null) {
                        return Collections.emptyList();
                }
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
new file mode 100644
index 00000000000..5c0ad507b4b
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarHandlerParameterTest.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.TestLogger;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.fail;
+
+/** Base test class for jar request handlers. */
+public abstract class JarHandlerParameterTest
+       <REQB extends JarRequestBody, M extends JarMessageParameters> extends 
TestLogger {
+       enum ProgramArgsParType {
+               String,
+               List,
+               Both
+       }
+
+       static final String[] PROG_ARGS = new String[] {"--host", "localhost", 
"--port", "1234"};
+       static final int PARALLELISM = 4;
+
+       @ClassRule
+       public static final TemporaryFolder TMP = new TemporaryFolder();
+
+       @ClassRule
+       public static final BlobServerResource BLOB_SERVER_RESOURCE = new 
BlobServerResource();
+
+       static final AtomicReference<JobGraph> 
LAST_SUBMITTED_JOB_GRAPH_REFERENCE = new AtomicReference<>();
+
+       static TestingDispatcherGateway restfulGateway;
+       static Path jarDir;
+       static GatewayRetriever<TestingDispatcherGateway> gatewayRetriever = () 
-> CompletableFuture.completedFuture(restfulGateway);
+       static CompletableFuture<String> localAddressFuture = 
CompletableFuture.completedFuture("shazam://localhost:12345");
+       static Time timeout = Time.seconds(10);
+       static Map<String, String> responseHeaders = Collections.emptyMap();
+       static Executor executor = TestingUtils.defaultExecutor();
+
+       private static Path jarWithManifest;
+       private static Path jarWithoutManifest;
+
+       static void init() throws Exception {
+               jarDir = TMP.newFolder().toPath();
+
+               // properties are set property by surefire plugin
+               final String parameterProgramJarName = 
System.getProperty("parameterJarName") + ".jar";
+               final String parameterProgramWithoutManifestJarName = 
System.getProperty("parameterJarWithoutManifestName") + ".jar";
+               final Path jarLocation = 
Paths.get(System.getProperty("targetDir"));
+
+               jarWithManifest = Files.copy(
+                       jarLocation.resolve(parameterProgramJarName),
+                       jarDir.resolve("program-with-manifest.jar"));
+               jarWithoutManifest = Files.copy(
+                       
jarLocation.resolve(parameterProgramWithoutManifestJarName),
+                       jarDir.resolve("program-without-manifest.jar"));
+
+               restfulGateway = new TestingDispatcherGateway.Builder()
+                       
.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
+                       .setSubmitFunction(jobGraph -> {
+                               
LAST_SUBMITTED_JOB_GRAPH_REFERENCE.set(jobGraph);
+                               return 
CompletableFuture.completedFuture(Acknowledge.get());
+                       })
+                       .build();
+
+               gatewayRetriever = () -> 
CompletableFuture.completedFuture(restfulGateway);
+               localAddressFuture = 
CompletableFuture.completedFuture("shazam://localhost:12345");
+               timeout = Time.seconds(10);
+               responseHeaders = Collections.emptyMap();
+               executor = TestingUtils.defaultExecutor();
+       }
+
+       @Before
+       public void reset() {
+               ParameterProgram.actualArguments = null;
+       }
+
+       @Test
+       public void testDefaultParameters() throws Exception {
+               // baseline, ensure that reasonable defaults are chosen
+               handleRequest(createRequest(
+                       getDefaultJarRequestBody(),
+                       getUnresolvedJarMessageParameters(),
+                       getUnresolvedJarMessageParameters(),
+                       jarWithManifest));
+               validateDefaultGraph();
+       }
+
+       @Test
+       public void testConfigurationViaQueryParametersWithProgArgsAsString() 
throws Exception {
+               testConfigurationViaQueryParameters(ProgramArgsParType.String);
+       }
+
+       @Test
+       public void testConfigurationViaQueryParametersWithProgArgsAsList() 
throws Exception {
+               testConfigurationViaQueryParameters(ProgramArgsParType.List);
+       }
+
+       @Test
+       public void 
testConfigurationViaQueryParametersFailWithProgArgsAsStringAndList() throws 
Exception {
+               try {
+                       
testConfigurationViaQueryParameters(ProgramArgsParType.Both);
+                       fail("RestHandlerException is excepted");
+               } catch (RestHandlerException e) {
+                       assertEquals(HttpResponseStatus.BAD_REQUEST, 
e.getHttpResponseStatus());
+               }
+       }
+
+       private void testConfigurationViaQueryParameters(ProgramArgsParType 
programArgsParType) throws Exception {
+               // configure submission via query parameters
+               handleRequest(createRequest(
+                       getDefaultJarRequestBody(),
+                       getJarMessageParameters(programArgsParType),
+                       getUnresolvedJarMessageParameters(),
+                       jarWithoutManifest));
+               validateGraph();
+       }
+
+       @Test
+       public void testConfigurationViaJsonRequestWithProgArgsAsString() 
throws Exception {
+               testConfigurationViaJsonRequest(ProgramArgsParType.String);
+       }
+
+       @Test
+       public void testConfigurationViaJsonRequestWithProgArgsAsList() throws 
Exception {
+               testConfigurationViaJsonRequest(ProgramArgsParType.List);
+       }
+
+       @Test
+       public void 
testConfigurationViaJsonRequestFailWithProgArgsAsStringAndList() throws 
Exception {
+               try {
+                       
testConfigurationViaJsonRequest(ProgramArgsParType.Both);
+                       fail("RestHandlerException is excepted");
+               } catch (RestHandlerException e) {
+                       assertEquals(HttpResponseStatus.BAD_REQUEST, 
e.getHttpResponseStatus());
+               }
+       }
+
+       private void testConfigurationViaJsonRequest(ProgramArgsParType 
programArgsParType) throws Exception {
+               handleRequest(createRequest(
+                       getJarRequestBody(programArgsParType),
+                       getUnresolvedJarMessageParameters(),
+                       getUnresolvedJarMessageParameters(),
+                       jarWithoutManifest
+               ));
+               validateGraph();
+       }
+
+       @Test
+       public void testParameterPrioritizationWithProgArgsAsString() throws 
Exception {
+               testParameterPrioritization(ProgramArgsParType.String);
+       }
+
+       @Test
+       public void testParameterPrioritizationWithProgArgsAsList() throws 
Exception {
+               testParameterPrioritization(ProgramArgsParType.List);
+       }
+
+       @Test
+       public void testFailIfProgArgsAreAsStringAndAsList() throws Exception {
+               try {
+                       testParameterPrioritization(ProgramArgsParType.Both);
+                       fail("RestHandlerException is excepted");
+               } catch (RestHandlerException e) {
+                       assertEquals(HttpResponseStatus.BAD_REQUEST, 
e.getHttpResponseStatus());
+               }
+       }
+
+       private void testParameterPrioritization(ProgramArgsParType 
programArgsParType) throws Exception {
+               // configure submission via query parameters and JSON request, 
JSON should be prioritized
+               handleRequest(createRequest(
+                       getJarRequestBody(programArgsParType),
+                       getWrongJarMessageParameters(programArgsParType),
+                       getUnresolvedJarMessageParameters(),
+                       jarWithoutManifest));
+               validateGraph();
+       }
+
+       static String getProgramArgsString(ProgramArgsParType 
programArgsParType) {
+               return programArgsParType == ProgramArgsParType.String || 
programArgsParType == ProgramArgsParType.Both
+                       ? String.join(" ", PROG_ARGS) : null;
+       }
+
+       static List<String> getProgramArgsList(ProgramArgsParType 
programArgsParType) {
+               return programArgsParType == ProgramArgsParType.List || 
programArgsParType == ProgramArgsParType.Both
+                       ? Arrays.asList(PROG_ARGS) : null;
+       }
+
+       private static <REQB extends JarRequestBody, M extends 
JarMessageParameters>
+       HandlerRequest<REQB, M> createRequest(
+               REQB requestBody, M parameters, M unresolvedMessageParameters, 
Path jar)
+               throws HandlerRequestException {
+
+               final Map<String, List<String>> queryParameterAsMap = 
parameters.getQueryParameters().stream()
+                       .filter(MessageParameter::isResolved)
+                       .collect(Collectors.toMap(
+                               MessageParameter::getKey,
+                               JarHandlerParameterTest::getValuesAsString
+                       ));
+
+               return new HandlerRequest<>(
+                       requestBody,
+                       unresolvedMessageParameters,
+                       Collections.singletonMap(JarIdPathParameter.KEY, 
jar.getFileName().toString()),
+                       queryParameterAsMap,
+                       Collections.emptyList()
+               );
+       }
+
+       private static <X> List<String> 
getValuesAsString(MessageQueryParameter<X> parameter) {
+               final List<X> values = parameter.getValue();
+               return 
values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+       }
+
+       abstract M getUnresolvedJarMessageParameters();
+
+       abstract M getJarMessageParameters(ProgramArgsParType 
programArgsParType);
+
+       abstract M getWrongJarMessageParameters(ProgramArgsParType 
programArgsParType);
+
+       abstract REQB getDefaultJarRequestBody();
+
+       abstract REQB getJarRequestBody(ProgramArgsParType programArgsParType);
+
+       abstract void handleRequest(HandlerRequest<REQB, M> request) throws 
Exception;
+
+       JobGraph validateDefaultGraph() {
+               JobGraph jobGraph = 
LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
+               Assert.assertEquals(0, ParameterProgram.actualArguments.length);
+               Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, 
getExecutionConfig(jobGraph).getParallelism());
+               return jobGraph;
+       }
+
+       JobGraph validateGraph() {
+               JobGraph jobGraph = 
LAST_SUBMITTED_JOB_GRAPH_REFERENCE.getAndSet(null);
+               Assert.assertArrayEquals(PROG_ARGS, 
ParameterProgram.actualArguments);
+               Assert.assertEquals(PARALLELISM, 
getExecutionConfig(jobGraph).getParallelism());
+               return jobGraph;
+       }
+
+       private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
+               ExecutionConfig executionConfig;
+               try {
+                       executionConfig = 
jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
+               } catch (Exception e) {
+                       throw new AssertionError("Exception while deserializing 
ExecutionConfig.", e);
+               }
+               return executionConfig;
+       }
+}
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java
new file mode 100644
index 00000000000..1e496bff897
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandlerParameterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+
+import org.junit.BeforeClass;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the parameter handling of the {@link JarPlanHandler}.
+ */
+public class JarPlanHandlerParameterTest extends 
JarHandlerParameterTest<JarPlanRequestBody, JarPlanMessageParameters> {
+       private static JarPlanHandler handler;
+
+       @BeforeClass
+       public static void setup() throws Exception {
+               init();
+               handler = new JarPlanHandler(
+                       localAddressFuture,
+                       gatewayRetriever,
+                       timeout,
+                       responseHeaders,
+                       JarPlanHeaders.getInstance(),
+                       jarDir,
+                       new Configuration(),
+                       executor,
+                       jobGraph -> {
+                               
LAST_SUBMITTED_JOB_GRAPH_REFERENCE.set(jobGraph);
+                               return new 
JobPlanInfo(JsonPlanGenerator.generatePlan(jobGraph));
+                       });
+       }
+
+       @Override
+       JarPlanMessageParameters getUnresolvedJarMessageParameters() {
+               return 
handler.getMessageHeaders().getUnresolvedMessageParameters();
+       }
+
+       @Override
+       JarPlanMessageParameters getJarMessageParameters(ProgramArgsParType 
programArgsParType) {
+               final JarPlanMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+               
parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+               
parameters.parallelismQueryParameter.resolve(Collections.singletonList(PARALLELISM));
+               if (programArgsParType == ProgramArgsParType.String ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       
parameters.programArgsQueryParameter.resolve(Collections.singletonList(String.join("
 ", PROG_ARGS)));
+               }
+               if (programArgsParType == ProgramArgsParType.List ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       
parameters.programArgQueryParameter.resolve(Arrays.asList(PROG_ARGS));
+               }
+               return parameters;
+       }
+
+       @Override
+       JarPlanMessageParameters 
getWrongJarMessageParameters(ProgramArgsParType programArgsParType) {
+               List<String> wrongArgs = Arrays.stream(PROG_ARGS).map(a -> a + 
"wrong").collect(Collectors.toList());
+               String argsWrongStr = String.join(" ", wrongArgs);
+               final JarPlanMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+               
parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
+               
parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+               if (programArgsParType == ProgramArgsParType.String || 
programArgsParType == ProgramArgsParType.Both) {
+                       
parameters.programArgsQueryParameter.resolve(Collections.singletonList(argsWrongStr));
+               }
+               if (programArgsParType == ProgramArgsParType.List ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       parameters.programArgQueryParameter.resolve(wrongArgs);
+               }
+               return parameters;
+       }
+
+       @Override
+       JarPlanRequestBody getDefaultJarRequestBody() {
+               return new JarPlanRequestBody();
+       }
+
+       @Override
+       JarPlanRequestBody getJarRequestBody(ProgramArgsParType 
programArgsParType) {
+               return new JarPlanRequestBody(
+                       ParameterProgram.class.getCanonicalName(),
+                       getProgramArgsString(programArgsParType),
+                       getProgramArgsList(programArgsParType),
+                       PARALLELISM);
+       }
+
+       @Override
+       void handleRequest(HandlerRequest<JarPlanRequestBody, 
JarPlanMessageParameters> request)
+               throws Exception {
+               handler.handleRequest(request, restfulGateway).get();
+       }
+}
+
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
index 8bb358a543c..c36f7c7802e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
@@ -18,89 +18,39 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.BlobServerOptions;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
-import org.apache.flink.runtime.rest.handler.HandlerRequestException;
-import org.apache.flink.runtime.rest.messages.MessageParameter;
-import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.util.BlobServerResource;
 import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.function.SupplierWithException;
-import org.apache.flink.util.function.ThrowingConsumer;
 
 import org.junit.Assert;
-import org.junit.Before;
 import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 /**
  * Tests for the parameter handling of the {@link JarRunHandler}.
  */
-public class JarRunHandlerParameterTest extends TestLogger {
+public class JarRunHandlerParameterTest extends 
JarHandlerParameterTest<JarRunRequestBody, JarRunMessageParameters> {
+       private static final boolean ALLOW_NON_RESTORED_STATE_QUERY = true;
+       private static final String RESTORE_PATH = "/foo/bar";
 
-       @ClassRule
-       public static final TemporaryFolder TMP = new TemporaryFolder();
-
-       @ClassRule
-       public static final BlobServerResource BLOB_SERVER_RESOURCE = new 
BlobServerResource();
-
-       private static final AtomicReference<JobGraph> 
lastSubmittedJobGraphReference = new AtomicReference<>();
        private static JarRunHandler handler;
-       private static Path jarWithManifest;
-       private static Path jarWithoutManifest;
-       private static TestingDispatcherGateway restfulGateway;
 
        @BeforeClass
        public static void setup() throws Exception {
-               Path jarDir = TMP.newFolder().toPath();
-
-               // properties are set property by surefire plugin
-               final String parameterProgramJarName = 
System.getProperty("parameterJarName") + ".jar";
-               final String parameterProgramWithoutManifestJarName = 
System.getProperty("parameterJarWithoutManifestName") + ".jar";
-               final Path jarLocation = 
Paths.get(System.getProperty("targetDir"));
-
-               jarWithManifest = Files.copy(
-                       jarLocation.resolve(parameterProgramJarName),
-                       jarDir.resolve("program-with-manifest.jar"));
-               jarWithoutManifest = Files.copy(
-                       
jarLocation.resolve(parameterProgramWithoutManifestJarName),
-                       jarDir.resolve("program-without-manifest.jar"));
-
-               Configuration config = new Configuration();
-               config.setString(BlobServerOptions.STORAGE_DIRECTORY,
-                       TMP.newFolder().getAbsolutePath());
-
-               restfulGateway = new TestingDispatcherGateway.Builder()
-                       
.setBlobServerPort(BLOB_SERVER_RESOURCE.getBlobServerPort())
-                       .setSubmitFunction(jobGraph -> {
-                               lastSubmittedJobGraphReference.set(jobGraph);
-                               return 
CompletableFuture.completedFuture(Acknowledge.get());
-                       })
-                       .build();
+               init();
                final GatewayRetriever<TestingDispatcherGateway> 
gatewayRetriever = () -> CompletableFuture.completedFuture(restfulGateway);
                final CompletableFuture<String> localAddressFuture = 
CompletableFuture.completedFuture("shazam://localhost:12345");
                final Time timeout = Time.seconds(10);
@@ -118,197 +68,87 @@ public static void setup() throws Exception {
                        executor);
        }
 
-       @Before
-       public void reset() {
-               ParameterProgram.actualArguments = null;
+       @Override
+       JarRunMessageParameters getUnresolvedJarMessageParameters() {
+               return 
handler.getMessageHeaders().getUnresolvedMessageParameters();
        }
 
-       @Test
-       public void testDefaultParameters() throws Exception {
-               // baseline, ensure that reasonable defaults are chosen
-               sendRequestAndValidateGraph(
-                       handler,
-                       restfulGateway,
-                       () -> createRequest(
-                               new JarRunRequestBody(),
-                               
JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
-                               jarWithManifest
-                       ),
-                       jobGraph -> {
-                               Assert.assertEquals(0, 
ParameterProgram.actualArguments.length);
-
-                               
Assert.assertEquals(ExecutionConfig.PARALLELISM_DEFAULT, 
getExecutionConfig(jobGraph).getParallelism());
-
-                               final SavepointRestoreSettings 
savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-                               
Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
-                               
Assert.assertNull(savepointRestoreSettings.getRestorePath());
-                       }
-               );
-       }
-
-       @Test
-       public void testConfigurationViaQueryParameters() throws Exception {
-               // configure submission via query parameters
-               sendRequestAndValidateGraph(
-                       handler,
-                       restfulGateway,
-                       () -> {
-                               final JarRunMessageParameters parameters = 
JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-                               
parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(true));
-                               
parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/foo/bar"));
-                               
parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
-                               
parameters.parallelismQueryParameter.resolve(Collections.singletonList(4));
-                               
parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host 
localhost --port 1234"));
-
-                               return createRequest(
-                                       new JarRunRequestBody(),
-                                       parameters,
-                                       jarWithoutManifest
-                               );
-                       },
-                       jobGraph -> {
-                               Assert.assertEquals(4, 
ParameterProgram.actualArguments.length);
-                               Assert.assertEquals("--host", 
ParameterProgram.actualArguments[0]);
-                               Assert.assertEquals("localhost", 
ParameterProgram.actualArguments[1]);
-                               Assert.assertEquals("--port", 
ParameterProgram.actualArguments[2]);
-                               Assert.assertEquals("1234", 
ParameterProgram.actualArguments[3]);
-
-                               Assert.assertEquals(4, 
getExecutionConfig(jobGraph).getParallelism());
-
-                               final SavepointRestoreSettings 
savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-                               
Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
-                               Assert.assertEquals("/foo/bar", 
savepointRestoreSettings.getRestorePath());
-                       }
-               );
+       @Override
+       JarRunMessageParameters getJarMessageParameters(ProgramArgsParType 
programArgsParType) {
+               final JarRunMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+               
parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(ALLOW_NON_RESTORED_STATE_QUERY));
+               
parameters.savepointPathQueryParameter.resolve(Collections.singletonList(RESTORE_PATH));
+               
parameters.entryClassQueryParameter.resolve(Collections.singletonList(ParameterProgram.class.getCanonicalName()));
+               
parameters.parallelismQueryParameter.resolve(Collections.singletonList(PARALLELISM));
+               if (programArgsParType == ProgramArgsParType.String ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       
parameters.programArgsQueryParameter.resolve(Collections.singletonList(String.join("
 ", PROG_ARGS)));
+               }
+               if (programArgsParType == ProgramArgsParType.List ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       
parameters.programArgQueryParameter.resolve(Arrays.asList(PROG_ARGS));
+               }
+               return parameters;
        }
 
-       @Test
-       public void testConfigurationViaJsonRequest() throws Exception {
-               sendRequestAndValidateGraph(
-                       handler,
-                       restfulGateway,
-                       () -> {
-                               final JarRunRequestBody jsonRequest = new 
JarRunRequestBody(
-                                       
ParameterProgram.class.getCanonicalName(),
-                                       "--host localhost --port 1234",
-                                       4,
-                                       true,
-                                       "/foo/bar"
-                               );
-
-                               return createRequest(
-                                       jsonRequest,
-                                       
JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
-                                       jarWithoutManifest
-                               );
-                       },
-                       jobGraph -> {
-                               Assert.assertEquals(4, 
ParameterProgram.actualArguments.length);
-                               Assert.assertEquals("--host", 
ParameterProgram.actualArguments[0]);
-                               Assert.assertEquals("localhost", 
ParameterProgram.actualArguments[1]);
-                               Assert.assertEquals("--port", 
ParameterProgram.actualArguments[2]);
-                               Assert.assertEquals("1234", 
ParameterProgram.actualArguments[3]);
-
-                               Assert.assertEquals(4, 
getExecutionConfig(jobGraph).getParallelism());
-
-                               final SavepointRestoreSettings 
savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-                               
Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
-                               Assert.assertEquals("/foo/bar", 
savepointRestoreSettings.getRestorePath());
-                       }
-               );
+       @Override
+       JarRunMessageParameters getWrongJarMessageParameters(ProgramArgsParType 
programArgsParType) {
+               List<String> wrongArgs = Arrays.stream(PROG_ARGS).map(a -> a + 
"wrong").collect(Collectors.toList());
+               String argsWrongStr = String.join(" ", wrongArgs);
+               final JarRunMessageParameters parameters = 
getUnresolvedJarMessageParameters();
+               
parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
+               
parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
+               
parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
+               
parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
+               if (programArgsParType == ProgramArgsParType.String ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       
parameters.programArgsQueryParameter.resolve(Collections.singletonList(argsWrongStr));
+               }
+               if (programArgsParType == ProgramArgsParType.List ||
+                       programArgsParType == ProgramArgsParType.Both) {
+                       parameters.programArgQueryParameter.resolve(wrongArgs);
+               }
+               return parameters;
        }
 
-       @Test
-       public void testParameterPrioritization() throws Exception {
-               // configure submission via query parameters and JSON request, 
JSON should be prioritized
-               sendRequestAndValidateGraph(
-                       handler,
-                       restfulGateway,
-                       () -> {
-                               final JarRunRequestBody jsonRequest = new 
JarRunRequestBody(
-                                       
ParameterProgram.class.getCanonicalName(),
-                                       "--host localhost --port 1234",
-                                       4,
-                                       true,
-                                       "/foo/bar"
-                               );
-
-                               final JarRunMessageParameters parameters = 
JarRunHeaders.getInstance().getUnresolvedMessageParameters();
-                               
parameters.allowNonRestoredStateQueryParameter.resolve(Collections.singletonList(false));
-                               
parameters.savepointPathQueryParameter.resolve(Collections.singletonList("/no/uh"));
-                               
parameters.entryClassQueryParameter.resolve(Collections.singletonList("please.dont.run.me"));
-                               
parameters.parallelismQueryParameter.resolve(Collections.singletonList(64));
-                               
parameters.programArgsQueryParameter.resolve(Collections.singletonList("--host 
wrong --port wrong"));
-
-                               return createRequest(
-                                       jsonRequest,
-                                       parameters,
-                                       jarWithoutManifest
-                               );
-                       },
-                       jobGraph -> {
-                               Assert.assertEquals(4, 
ParameterProgram.actualArguments.length);
-                               Assert.assertEquals("--host", 
ParameterProgram.actualArguments[0]);
-                               Assert.assertEquals("localhost", 
ParameterProgram.actualArguments[1]);
-                               Assert.assertEquals("--port", 
ParameterProgram.actualArguments[2]);
-                               Assert.assertEquals("1234", 
ParameterProgram.actualArguments[3]);
-
-                               Assert.assertEquals(4, 
getExecutionConfig(jobGraph).getParallelism());
-
-                               final SavepointRestoreSettings 
savepointRestoreSettings = jobGraph.getSavepointRestoreSettings();
-                               
Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
-                               Assert.assertEquals("/foo/bar", 
savepointRestoreSettings.getRestorePath());
-                       }
-               );
+       @Override
+       JarRunRequestBody getDefaultJarRequestBody() {
+               return new JarRunRequestBody();
        }
 
-       private static HandlerRequest<JarRunRequestBody, 
JarRunMessageParameters> createRequest(
-                       JarRunRequestBody requestBody,
-                       JarRunMessageParameters parameters,
-                       Path jar) throws HandlerRequestException {
-
-               final Map<String, List<String>> queryParameterAsMap = 
parameters.getQueryParameters().stream()
-                       .filter(MessageParameter::isResolved)
-                       .collect(Collectors.toMap(
-                               MessageParameter::getKey,
-                               JarRunHandlerParameterTest::getValuesAsString
-                       ));
-
-               return new HandlerRequest<>(
-                       requestBody,
-                       
JarRunHeaders.getInstance().getUnresolvedMessageParameters(),
-                       Collections.singletonMap(JarIdPathParameter.KEY, 
jar.getFileName().toString()),
-                       queryParameterAsMap,
-                       Collections.emptyList()
+       @Override
+       JarRunRequestBody getJarRequestBody(ProgramArgsParType 
programArgsParType) {
+               return new JarRunRequestBody(
+                       ParameterProgram.class.getCanonicalName(),
+                       getProgramArgsString(programArgsParType),
+                       getProgramArgsList(programArgsParType),
+                       PARALLELISM,
+                       ALLOW_NON_RESTORED_STATE_QUERY,
+                       RESTORE_PATH
                );
        }
 
-       private static void sendRequestAndValidateGraph(
-                       JarRunHandler handler,
-                       DispatcherGateway dispatcherGateway,
-                       SupplierWithException<HandlerRequest<JarRunRequestBody, 
JarRunMessageParameters>, HandlerRequestException> requestSupplier,
-                       ThrowingConsumer<JobGraph, AssertionError> validator) 
throws Exception {
-
-               handler.handleRequest(requestSupplier.get(), dispatcherGateway)
-                       .get();
-
-               JobGraph submittedJobGraph = 
lastSubmittedJobGraphReference.getAndSet(null);
-
-               validator.accept(submittedJobGraph);
+       @Override
+       void handleRequest(HandlerRequest<JarRunRequestBody, 
JarRunMessageParameters> request)
+               throws Exception {
+               handler.handleRequest(request, restfulGateway).get();
        }
 
-       private static ExecutionConfig getExecutionConfig(JobGraph jobGraph) {
-               ExecutionConfig executionConfig;
-               try {
-                       executionConfig = 
jobGraph.getSerializedExecutionConfig().deserializeValue(ParameterProgram.class.getClassLoader());
-               } catch (Exception e) {
-                       throw new AssertionError("Exception while deserializing 
ExecutionConfig.", e);
-               }
-               return executionConfig;
+       @Override
+       JobGraph validateDefaultGraph() {
+               JobGraph jobGraph = super.validateDefaultGraph();
+               final SavepointRestoreSettings savepointRestoreSettings = 
jobGraph.getSavepointRestoreSettings();
+               
Assert.assertFalse(savepointRestoreSettings.allowNonRestoredState());
+               Assert.assertNull(savepointRestoreSettings.getRestorePath());
+               return jobGraph;
        }
 
-       private static <X> List<String> 
getValuesAsString(MessageQueryParameter<X> parameter) {
-               final List<X> values = parameter.getValue();
-               return 
values.stream().map(parameter::convertValueToString).collect(Collectors.toList());
+       @Override
+       JobGraph validateGraph() {
+               JobGraph jobGraph = super.validateGraph();
+               final SavepointRestoreSettings savepointRestoreSettings = 
jobGraph.getSavepointRestoreSettings();
+               
Assert.assertTrue(savepointRestoreSettings.allowNonRestoredState());
+               Assert.assertEquals(RESTORE_PATH, 
savepointRestoreSettings.getRestorePath());
+               return jobGraph;
        }
 }
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
index 0706873c0fa..18fcd97d340 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
@@ -20,6 +20,8 @@
 
 import org.apache.flink.runtime.rest.messages.RestRequestMarshallingTestBase;
 
+import java.util.Arrays;
+
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -33,10 +35,11 @@
        }
 
        @Override
-       protected JarRunRequestBody getTestRequestInstance() throws Exception {
+       protected JarRunRequestBody getTestRequestInstance() {
                return new JarRunRequestBody(
                        "hello",
                        "world",
+                       Arrays.asList("boo", "far"),
                        4,
                        true,
                        "foo/bar"
@@ -49,6 +52,7 @@ protected void assertOriginalEqualsToUnmarshalled(
                        final JarRunRequestBody actual) {
                assertEquals(expected.getEntryClassName(), 
actual.getEntryClassName());
                assertEquals(expected.getProgramArguments(), 
actual.getProgramArguments());
+               assertEquals(expected.getProgramArgumentsList(), 
actual.getProgramArgumentsList());
                assertEquals(expected.getParallelism(), 
actual.getParallelism());
                assertEquals(expected.getAllowNonRestoredState(), 
actual.getAllowNonRestoredState());
                assertEquals(expected.getSavepointPath(), 
actual.getSavepointPath());
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
index e64c708dbe7..0753c494c63 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarSubmissionITCase.java
@@ -127,8 +127,8 @@ private static JarListInfo listJars(JarListHandler handler, 
RestfulGateway restf
 
        private static JobPlanInfo showPlan(JarPlanHandler handler, String 
jarName, RestfulGateway restfulGateway) throws Exception {
                JarPlanMessageParameters planParameters = 
JarPlanHeaders.getInstance().getUnresolvedMessageParameters();
-               HandlerRequest<EmptyRequestBody, JarPlanMessageParameters> 
planRequest = new HandlerRequest<>(
-                       EmptyRequestBody.getInstance(),
+               HandlerRequest<JarPlanRequestBody, JarPlanMessageParameters> 
planRequest = new HandlerRequest<>(
+                       new JarPlanRequestBody(),
                        planParameters,
                        
Collections.singletonMap(planParameters.jarIdPathParameter.getKey(), jarName),
                        Collections.emptyMap(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
index c738df821bb..8b18f9907f5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
@@ -23,9 +23,12 @@
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
 import org.apache.flink.runtime.rest.messages.RequestBody;
+import org.apache.flink.util.function.SupplierWithException;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import org.slf4j.Logger;
+
 import java.util.List;
 
 /**
@@ -63,4 +66,26 @@
                return value;
        }
 
+       /**
+        * Returns {@code requestValue} if it is not null, otherwise returns 
the query parameter value
+        * if it is not null, otherwise returns the default value.
+        */
+       public static <T> T fromRequestBodyOrQueryParameter(
+                       T requestValue,
+                       SupplierWithException<T, RestHandlerException> 
queryParameterExtractor,
+                       T defaultValue,
+                       Logger log) throws RestHandlerException {
+               if (requestValue != null) {
+                       return requestValue;
+               } else {
+                       T queryParameterValue = queryParameterExtractor.get();
+                       if (queryParameterValue != null) {
+                               log.warn("Configuring the job submission via 
query parameters is deprecated." +
+                                       " Please migrate to submitting a JSON 
request instead.");
+                               return queryParameterValue;
+                       } else {
+                               return defaultValue;
+                       }
+               }
+       }
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


> Tokenisation of Program Args resulting in unexpected results
> ------------------------------------------------------------
>
>                 Key: FLINK-10295
>                 URL: https://issues.apache.org/jira/browse/FLINK-10295
>             Project: Flink
>          Issue Type: Bug
>          Components: REST, Webfrontend
>    Affects Versions: 1.5.0, 1.6.0, 1.7.0
>            Reporter: Gaurav Singhania
>            Assignee: Andrey Zagrebin
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>         Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to