[FLINK-7715][flip6] Implement JarRunHandler

This closes #5509.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab8e9bdb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab8e9bdb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab8e9bdb

Branch: refs/heads/master
Commit: ab8e9bdb8428b8c9b2c7a72ccd3f9e7b6e2dad58
Parents: 2007338
Author: gyao <g...@data-artisans.com>
Authored: Fri Feb 16 15:15:19 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 19 16:04:18 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |   2 +-
 .../flink/client/program/PackagedProgram.java   |   4 +-
 .../AllowNonRestoredStateQueryParameter.java    |  45 ++++
 .../handlers/EntryClassQueryParameter.java      |  31 +++
 .../webmonitor/handlers/JarIdPathParameter.java |  44 ++++
 .../webmonitor/handlers/JarRunHandler.java      | 229 +++++++++++++++++++
 .../webmonitor/handlers/JarRunHeaders.java      |  60 +++++
 .../handlers/JarRunMessageParameters.java       |  60 +++++
 .../webmonitor/handlers/JarRunResponseBody.java |  54 +++++
 .../handlers/ParallelismQueryParameter.java     |  44 ++++
 .../handlers/ProgramArgsQueryParameter.java     |  33 +++
 .../handlers/SavepointPathQueryParameter.java   |  31 +++
 .../handlers/StringQueryParameter.java          |  42 ++++
 .../runtime/webmonitor/WebMonitorUtilsTest.java |  10 +-
 ...AllowNonRestoredStateQueryParameterTest.java |  48 ++++
 .../webmonitor/handlers/JarRunHandlerTest.java  | 100 ++++++++
 .../handlers/JarRunResponseBodyTest.java        |  47 ++++
 .../handlers/ParallelismQueryParameterTest.java |  44 ++++
 .../scripts/modules/submit/submit.ctrl.coffee   |  26 ++-
 .../web-dashboard/web/js/hs/index.js            |   4 +-
 flink-runtime-web/web-dashboard/web/js/index.js |   4 +-
 .../dispatcher/DispatcherRestEndpoint.java      |   3 +-
 .../runtime/webmonitor/WebMonitorUtils.java     |  33 ++-
 23 files changed, 979 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 23e82bc..2d984d6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -1126,7 +1126,7 @@ public class CliFrontend {
         * @return JobGraph extracted from the PackagedProgram
         * @throws ProgramInvocationException if the JobGraph generation failed
         */
-       private static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
+       public static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
                
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
                final Optimizer optimizer = new Optimizer(new DataStatistics(), 
new DefaultCostEstimator(), configuration);
                final FlinkPlan flinkPlan;

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
index 2d8cb1d..d81cacb 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java
@@ -145,7 +145,7 @@ public class PackagedProgram {
         *         This invocation is thrown if the Program can't be properly 
loaded. Causes
         *         may be a missing / wrong class or manifest files.
         */
-       public PackagedProgram(File jarFile, String entryPointClassName, 
String... args) throws ProgramInvocationException {
+       public PackagedProgram(File jarFile, @Nullable String 
entryPointClassName, String... args) throws ProgramInvocationException {
                this(jarFile, Collections.<URL>emptyList(), 
entryPointClassName, args);
        }
 
@@ -168,7 +168,7 @@ public class PackagedProgram {
         *         This invocation is thrown if the Program can't be properly 
loaded. Causes
         *         may be a missing / wrong class or manifest files.
         */
-       public PackagedProgram(File jarFile, List<URL> classpaths, String 
entryPointClassName, String... args) throws ProgramInvocationException {
+       public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable 
String entryPointClassName, String... args) throws ProgramInvocationException {
                if (jarFile == null) {
                        throw new IllegalArgumentException("The jar file must 
not be null.");
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
new file mode 100644
index 0000000..7ad014e
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameter.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * Query parameter that specifies whether non restored state is allowed if the 
savepoint
+ * contains state for an operator that is not part of the job.
+ *
+ * @see SavepointRestoreSettings#allowNonRestoredState()
+ */
+public class AllowNonRestoredStateQueryParameter extends 
MessageQueryParameter<Boolean> {
+
+       protected AllowNonRestoredStateQueryParameter() {
+               super("allowNonRestoredState", 
MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public Boolean convertValueFromString(final String value) {
+               return Boolean.valueOf(value);
+       }
+
+       @Override
+       public String convertStringToValue(final Boolean value) {
+               return value.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java
new file mode 100644
index 0000000..05298a6
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/EntryClassQueryParameter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying the name of the entry point class.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class EntryClassQueryParameter extends StringQueryParameter {
+       public EntryClassQueryParameter() {
+               super("entry-class", MessageParameterRequisiteness.OPTIONAL);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java
new file mode 100644
index 0000000..d4bd602
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarIdPathParameter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.ConversionException;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+
+/**
+ * Path parameter to identify uploaded jar files.
+ */
+public class JarIdPathParameter extends MessagePathParameter<String> {
+
+       public static final String KEY = "jarid";
+
+       protected JarIdPathParameter() {
+               super(KEY);
+       }
+
+       @Override
+       protected String convertFromString(final String value) throws 
ConversionException {
+               return value;
+       }
+
+       @Override
+       protected String convertToString(final String value) {
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
new file mode 100644
index 0000000..47ea4d3
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+               AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
+
+       private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+       private final Path jarDir;
+
+       private final Configuration configuration;
+
+       private final Executor executor;
+
+       private final RestClusterClient<?> restClusterClient;
+
+       public JarRunHandler(
+                       final CompletableFuture<String> localRestAddress,
+                       final GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
+                       final Time timeout,
+                       final Map<String, String> responseHeaders,
+                       final MessageHeaders<EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> messageHeaders,
+                       final Path jarDir,
+                       final Configuration configuration,
+                       final Executor executor) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+               this.jarDir = requireNonNull(jarDir);
+               this.configuration = requireNonNull(configuration);
+               this.executor = requireNonNull(executor);
+               try {
+                       this.restClusterClient = new 
RestClusterClient<>(configuration, "Unknown cluster id");
+               } catch (Exception e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       protected CompletableFuture<JarRunResponseBody> handleRequest(
+                       @Nonnull final HandlerRequest<EmptyRequestBody, 
JarRunMessageParameters> request,
+                       @Nonnull final DispatcherGateway gateway) throws 
RestHandlerException {
+
+               final String pathParameter = 
request.getPathParameter(JarIdPathParameter.class);
+               final Path jarFile = jarDir.resolve(pathParameter);
+
+               final String entryClass = 
emptyToNull(getQueryParameter(request, EntryClassQueryParameter.class));
+               final List<String> programArgs = 
tokenizeArguments(getQueryParameter(request, ProgramArgsQueryParameter.class));
+               final int parallelism = getQueryParameter(request, 
ParallelismQueryParameter.class, -1);
+               final SavepointRestoreSettings savepointRestoreSettings = 
getSavepointRestoreSettings(request);
+
+               final CompletableFuture<JobGraph> jobGraphFuture = 
getJobGraphAsync(
+                       jarFile,
+                       entryClass,
+                       programArgs,
+                       savepointRestoreSettings,
+                       parallelism);
+
+               return jobGraphFuture.thenCompose(jobGraph -> restClusterClient
+                       .submitJob(jobGraph)
+                       .thenApply((jobSubmitResponseBody -> new 
JarRunResponseBody(jobGraph.getJobID()))))
+                       .exceptionally(throwable -> {
+                               throw new CompletionException(new 
RestHandlerException(
+                                       throwable.getMessage(),
+                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                       throwable));
+                       });
+       }
+
+       private static SavepointRestoreSettings getSavepointRestoreSettings(
+                       final @Nonnull HandlerRequest<EmptyRequestBody, 
JarRunMessageParameters> request)
+                               throws RestHandlerException {
+
+               final boolean allowNonRestoredState = 
getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
+               final String savepointPath = getQueryParameter(request, 
SavepointPathQueryParameter.class);
+               final SavepointRestoreSettings savepointRestoreSettings;
+               if (savepointPath != null) {
+                       savepointRestoreSettings = 
SavepointRestoreSettings.forPath(
+                               savepointPath,
+                               allowNonRestoredState);
+               } else {
+                       savepointRestoreSettings = 
SavepointRestoreSettings.none();
+               }
+               return savepointRestoreSettings;
+       }
+
+       private CompletableFuture<JobGraph> getJobGraphAsync(
+                       final Path jarFile,
+                       @Nullable final String entryClass,
+                       final List<String> programArgs,
+                       final SavepointRestoreSettings savepointRestoreSettings,
+                       final int parallelism) {
+
+               return CompletableFuture.supplyAsync(() -> {
+                       if (!Files.exists(jarFile)) {
+                               throw new CompletionException(new 
RestHandlerException(
+                                       String.format("Jar file %s does not 
exist", jarFile), HttpResponseStatus.BAD_REQUEST));
+                       }
+
+                       final JobGraph jobGraph;
+                       try {
+                               final PackagedProgram packagedProgram = new 
PackagedProgram(
+                                       jarFile.toFile(),
+                                       entryClass,
+                                       programArgs.toArray(new 
String[programArgs.size()]));
+                               jobGraph = 
CliFrontend.createJobGraph(configuration, packagedProgram, parallelism);
+                       } catch (final ProgramInvocationException e) {
+                               throw new CompletionException(e);
+                       }
+                       
jobGraph.setSavepointRestoreSettings(savepointRestoreSettings);
+                       return jobGraph;
+               }, executor);
+       }
+
+       /**
+        * Takes program arguments as a single string, and splits them into a 
list of string.
+        *
+        * <pre>
+        * tokenizeArguments("--foo bar")            = ["--foo" "bar"]
+        * tokenizeArguments("--foo \"bar baz\"")    = ["--foo" "bar baz"]
+        * tokenizeArguments("--foo 'bar baz'")      = ["--foo" "bar baz"]
+        * </pre>
+        *
+        * <strong>WARNING: </strong>This method does not respect escaped 
quotes.
+        */
+       @VisibleForTesting
+       static List<String> tokenizeArguments(@Nullable final String args) {
+               if (args == null) {
+                       return Collections.emptyList();
+               }
+               final Matcher matcher = 
ARGUMENTS_TOKENIZE_PATTERN.matcher(args);
+               final List<String> tokens = new ArrayList<>();
+               while (matcher.find()) {
+                       tokens.add(matcher.group()
+                               .trim()
+                               .replace("\"", "")
+                               .replace("\'", ""));
+               }
+               return tokens;
+       }
+
+       /**
+        * Returns the value of a query parameter, or {@code null} if the query 
parameter is not set.
+        * @throws RestHandlerException If the query parameter is repeated.
+        */
+       @VisibleForTesting
+       static <X, P extends MessageQueryParameter<X>> X getQueryParameter(
+                       final HandlerRequest<EmptyRequestBody, 
JarRunMessageParameters> request,
+                       final Class<P> queryParameterClass) throws 
RestHandlerException {
+               return getQueryParameter(request, queryParameterClass, null);
+       }
+
+       @VisibleForTesting
+       static <X, P extends MessageQueryParameter<X>> X getQueryParameter(
+                       final HandlerRequest<EmptyRequestBody, 
JarRunMessageParameters> request,
+                       final Class<P> queryParameterClass,
+                       final X defaultValue) throws RestHandlerException {
+
+               final List<X> values = 
request.getQueryParameter(queryParameterClass);
+               final X value;
+               if (values.size() > 1) {
+                       throw new RestHandlerException(
+                               String.format("Expected only one value %s.", 
values),
+                               HttpResponseStatus.BAD_REQUEST);
+               } else if (values.size() == 1) {
+                       value = values.get(0);
+               } else {
+                       value = defaultValue;
+               }
+               return value;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
new file mode 100644
index 0000000..a6d7ecb
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * {@link MessageHeaders} for {@link JarRunHandler}.
+ */
+public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
+       @Override
+       public Class<JarRunResponseBody> getResponseClass() {
+               return JarRunResponseBody.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public JarRunMessageParameters getUnresolvedMessageParameters() {
+               return new JarRunMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.POST;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return "/jars/:" + JarIdPathParameter.KEY + "/run";
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
new file mode 100644
index 0000000..2d9428c
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * {@link MessageParameters} for {@link JarRunHandler}.
+ */
+public class JarRunMessageParameters extends MessageParameters {
+
+       private final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
+
+       private final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
+
+       private final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
+
+       private final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
+
+       private final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
+
+       private final SavepointPathQueryParameter savepointPathQueryParameter = 
new SavepointPathQueryParameter();
+
+       @Override
+       public Collection<MessagePathParameter<?>> getPathParameters() {
+               return Collections.singletonList(jarIdPathParameter);
+       }
+
+       @Override
+       public Collection<MessageQueryParameter<?>> getQueryParameters() {
+               return Collections.unmodifiableCollection(Arrays.asList(
+                       programArgsQueryParameter,
+                       entryClassQueryParameter,
+                       parallelismQueryParameter,
+                       allowNonRestoredStateQueryParameter,
+                       savepointPathQueryParameter));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java
new file mode 100644
index 0000000..1195249
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBody.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.json.JobIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobIDSerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Response for {@link JarRunHandler}.
+ */
+public class JarRunResponseBody implements ResponseBody {
+
+       @JsonProperty("jobid")
+       @JsonDeserialize(using = JobIDDeserializer.class)
+       @JsonSerialize(using = JobIDSerializer.class)
+       private final JobID jobId;
+
+       @JsonCreator
+       public JarRunResponseBody(
+                       @JsonProperty("jobid")
+                       @JsonDeserialize(using = JobIDDeserializer.class) final 
JobID jobId) {
+               this.jobId = requireNonNull(jobId);
+       }
+
+       public JobID getJobId() {
+               return jobId;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
new file mode 100644
index 0000000..26cb16c
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameter.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * Query parameter specifying the parallelism of the job.
+ * @see JarRunHandler
+ */
+public class ParallelismQueryParameter extends MessageQueryParameter<Integer> {
+
+       private static final String KEY = "parallelism";
+
+       public ParallelismQueryParameter() {
+               super(KEY, MessageParameterRequisiteness.OPTIONAL);
+       }
+
+       @Override
+       public Integer convertValueFromString(final String value) {
+               return Integer.valueOf(value);
+       }
+
+       @Override
+       public String convertStringToValue(final Integer value) {
+               return value.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
new file mode 100644
index 0000000..8f546ab
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying the arguments for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class ProgramArgsQueryParameter extends StringQueryParameter {
+
+       public ProgramArgsQueryParameter() {
+               super("program-args", MessageParameterRequisiteness.OPTIONAL);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SavepointPathQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SavepointPathQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SavepointPathQueryParameter.java
new file mode 100644
index 0000000..c1c6d2b
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SavepointPathQueryParameter.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+/**
+ * Query parameter that specifies the savepoint to restore from.
+ */
+public class SavepointPathQueryParameter  extends StringQueryParameter {
+
+       public static final String KEY = "savepointPath";
+
+       public SavepointPathQueryParameter() {
+               super(KEY, MessageParameterRequisiteness.OPTIONAL);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
new file mode 100644
index 0000000..226c592
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/StringQueryParameter.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+/**
+ * Abstract super class for {@link String} query parameters.
+ */
+public abstract class StringQueryParameter extends 
MessageQueryParameter<String> {
+
+       public StringQueryParameter(final String key, final 
MessageParameterRequisiteness requisiteness) {
+               super(key, requisiteness);
+       }
+
+       @Override
+       public final String convertValueFromString(final String value) {
+               return value;
+       }
+
+       @Override
+       public final String convertStringToValue(final String value) {
+               return value;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
index 772584a..e62ac3e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
@@ -19,9 +19,12 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -37,7 +40,7 @@ import static org.junit.Assert.assertThat;
 /**
  * Tests for the WebMonitorUtils.
  */
-public class WebMonitorUtilsTest {
+public class WebMonitorUtilsTest extends TestLogger {
 
        @Test
        public void testGetArchivers() {
@@ -55,12 +58,15 @@ public class WebMonitorUtilsTest {
         */
        @Test
        public void testTryLoadJarHandlers() {
+               final Configuration configuration = new Configuration();
+               configuration.setString(JobManagerOptions.ADDRESS, "localhost");
                assertThat(WebMonitorUtils.tryLoadJarHandlers(
                        CompletableFuture::new,
                        CompletableFuture.completedFuture("localhost:12345"),
                        Time.seconds(10),
                        Collections.emptyMap(),
                        Paths.get("/tmp"),
-                       Executors.directExecutor()), not(empty()));
+                       Executors.directExecutor(),
+                       configuration), not(empty()));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
new file mode 100644
index 0000000..9882637
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/AllowNonRestoredStateQueryParameterTest.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link AllowNonRestoredStateQueryParameter}.
+ */
+public class AllowNonRestoredStateQueryParameterTest extends TestLogger {
+
+       private final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter =
+               new AllowNonRestoredStateQueryParameter();
+
+       @Test
+       public void testConvertStringToValue() {
+               assertEquals("false", 
allowNonRestoredStateQueryParameter.convertStringToValue(false));
+               assertEquals("true", 
allowNonRestoredStateQueryParameter.convertStringToValue(true));
+       }
+
+       @Test
+       public void testConvertValueFromString() {
+               assertEquals(false, 
allowNonRestoredStateQueryParameter.convertValueFromString("false"));
+               assertEquals(true, 
allowNonRestoredStateQueryParameter.convertValueFromString("true"));
+               assertEquals(true, 
allowNonRestoredStateQueryParameter.convertValueFromString("TRUE"));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
new file mode 100644
index 0000000..47feebc
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.hamcrest.Matchers.containsString;
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link JarRunHandler}.
+ */
+public class JarRunHandlerTest extends TestLogger {
+
+       @Test
+       public void testTokenizeNonQuoted() {
+               final List<String> arguments = 
JarRunHandler.tokenizeArguments("--foo bar");
+               assertThat(arguments.get(0), equalTo("--foo"));
+               assertThat(arguments.get(1), equalTo("bar"));
+       }
+
+       @Test
+       public void testTokenizeSingleQuoted() {
+               final List<String> arguments = 
JarRunHandler.tokenizeArguments("--foo 'bar baz '");
+               assertThat(arguments.get(0), equalTo("--foo"));
+               assertThat(arguments.get(1), equalTo("bar baz "));
+       }
+
+       @Test
+       public void testTokenizeDoubleQuoted() {
+               final List<String> arguments = 
JarRunHandler.tokenizeArguments("--name \"K. Bote \"");
+               assertThat(arguments.get(0), equalTo("--name"));
+               assertThat(arguments.get(1), equalTo("K. Bote "));
+       }
+
+       @Test
+       public void testGetQueryParameter() throws Exception {
+               final Boolean queryParameter = JarRunHandler.getQueryParameter(
+                       new HandlerRequest<>(
+                               EmptyRequestBody.getInstance(),
+                               new JarRunMessageParameters(),
+                               Collections.emptyMap(),
+                               
Collections.singletonMap("allowNonRestoredState", 
Collections.singletonList("true"))),
+                       AllowNonRestoredStateQueryParameter.class);
+               assertThat(queryParameter, equalTo(true));
+       }
+
+       @Test
+       public void testGetQueryParameterRepeated() throws Exception {
+               try {
+                       JarRunHandler.getQueryParameter(
+                               new HandlerRequest<>(
+                                       EmptyRequestBody.getInstance(),
+                                       new JarRunMessageParameters(),
+                                       Collections.emptyMap(),
+                                       
Collections.singletonMap("allowNonRestoredState", Arrays.asList("true", 
"false"))),
+                               AllowNonRestoredStateQueryParameter.class);
+               } catch (final RestHandlerException e) {
+                       assertThat(e.getMessage(), containsString("Expected 
only one value"));
+               }
+       }
+
+       @Test
+       public void testGetQueryParameterDefaultValue() throws Exception {
+               final Boolean allowNonRestoredState = 
JarRunHandler.getQueryParameter(
+                       new HandlerRequest<>(
+                               EmptyRequestBody.getInstance(),
+                               new JarRunMessageParameters(),
+                               Collections.emptyMap(),
+                               
Collections.singletonMap("allowNonRestoredState", Collections.emptyList())),
+                       AllowNonRestoredStateQueryParameter.class, true);
+               assertThat(allowNonRestoredState, equalTo(true));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java
new file mode 100644
index 0000000..92497c6
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunResponseBodyTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link JarRunResponseBody}.
+ */
+public class JarRunResponseBodyTest extends 
RestResponseMarshallingTestBase<JarRunResponseBody> {
+
+       @Override
+       protected Class<JarRunResponseBody> getTestResponseClass() {
+               return JarRunResponseBody.class;
+       }
+
+       @Override
+       protected JarRunResponseBody getTestResponseInstance() throws Exception 
{
+               return new JarRunResponseBody(new JobID());
+       }
+
+       @Override
+       protected void assertOriginalEqualsToUnmarshalled(
+                       final JarRunResponseBody expected,
+                       final JarRunResponseBody actual) {
+               assertEquals(expected.getJobId(), actual.getJobId());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameterTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameterTest.java
new file mode 100644
index 0000000..8189dd5
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/ParallelismQueryParameterTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for {@link ParallelismQueryParameter}.
+ */
+public class ParallelismQueryParameterTest extends TestLogger {
+
+       private final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
+
+       @Test
+       public void testConvertStringToValue()   {
+               assertEquals("42", 
parallelismQueryParameter.convertStringToValue(42));
+       }
+
+       @Test
+       public void testConvertValueFromString() {
+               assertEquals(42, (int) 
parallelismQueryParameter.convertValueFromString("42"));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee 
b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
index bfd39b9..4e14a65 100644
--- 
a/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
+++ 
b/flink-runtime-web/web-dashboard/app/scripts/modules/submit/submit.ctrl.coffee
@@ -98,14 +98,26 @@ angular.module('flinkApp')
       $scope.state['submit-button'] = "Submitting"
       $scope.state['plan-button'] = "Show Plan"
       $scope.error = null
+
+      queryParameters = {}
+
+      if $scope.state['entry-class']
+        queryParameters['entry-class'] = $scope.state['entry-class']
+
+      if $scope.state.parallelism
+        queryParameters['parallelism'] = $scope.state['parallelism']
+
+      if $scope.state['program-args']
+        queryParameters['program-args'] = $scope.state['program-args']
+
+      if $scope.state['savepointPath']
+        queryParameters['savepointPath'] = $scope.state['savepointPath']
+
+      if $scope.state['allowNonRestoredState']
+        queryParameters['allowNonRestoredState'] = 
$scope.state['allowNonRestoredState']
+
       JobSubmitService.runJob(
-        $scope.state.selected, {
-          'entry-class': $scope.state['entry-class'],
-          parallelism: $scope.state.parallelism,
-          'program-args': $scope.state['program-args'],
-          savepointPath: $scope.state['savepointPath'],
-          allowNonRestoredState: $scope.state['allowNonRestoredState']
-        }
+        $scope.state.selected, queryParameters
       ).then (data) ->
         if action == $scope.state['action-time']
           $scope.state['submit-button'] = "Submit"

Reply via email to