[FLINK-7715] [flip6] Introduce WebSubmissionExtension for web submission 
handlers

Introduce a WebMonitorExtension interface which can be used to dynamically load
web monitor extensions. Web monitor extension provide channel inbound handlers
which are added to the WebMonitorEndpoint. Furthermore, they offer a close and
closeAsync method to close their resources. That way they can be integrated in
the lifecycle of the WebMonitorEndpoint.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ec752138
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ec752138
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ec752138

Branch: refs/heads/master
Commit: ec752138b0d1352c6a18e116fd469bcc1ca4dd42
Parents: ab8e9bd
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Sun Feb 18 12:21:45 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Mon Feb 19 17:27:33 2018 +0100

----------------------------------------------------------------------
 .../apache/flink/client/cli/CliFrontend.java    |  67 +--------
 .../client/program/PackagedProgramUtils.java    | 103 ++++++++++++++
 .../apache/flink/util/AutoCloseableAsync.java   |  46 +++++++
 .../webmonitor/WebSubmissionExtension.java      | 110 +++++++++++++++
 .../webmonitor/handlers/JarListHeaders.java     |   7 +
 .../webmonitor/handlers/JarRunHandler.java      |  21 ++-
 .../webmonitor/handlers/JarRunHeaders.java      |   9 ++
 .../webmonitor/handlers/JarUploadHandler.java   |   8 +-
 .../handlers/JarUploadMessageHeaders.java       |   7 +
 .../runtime/webmonitor/WebMonitorUtilsTest.java |  11 +-
 .../handlers/JarUploadHandlerTest.java          |   2 +-
 .../dispatcher/DispatcherRestEndpoint.java      |  55 ++++++--
 .../runtime/webmonitor/WebMonitorExtension.java |  65 +++++++++
 .../runtime/webmonitor/WebMonitorUtils.java     | 136 ++++++-------------
 14 files changed, 457 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 2d984d6..a849de1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -23,14 +23,12 @@ import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
-import org.apache.flink.api.common.Plan;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.JobWithJars;
-import org.apache.flink.client.program.OptimizerPlanEnvironment;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.ProgramMissingJobException;
 import org.apache.flink.client.program.ProgramParametrizationException;
@@ -40,7 +38,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.costs.DefaultCostEstimator;
@@ -48,7 +45,6 @@ import org.apache.flink.optimizer.plan.FlinkPlan;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
-import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.concurrent.FutureUtils;
@@ -74,7 +70,6 @@ import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.net.InetSocketAddress;
-import java.net.URISyntaxException;
 import java.net.URL;
 import java.text.SimpleDateFormat;
 import java.util.ArrayList;
@@ -229,7 +224,7 @@ public class CliFrontend {
                        if (flip6 && clusterId == null && 
runOptions.getDetachedMode()) {
                                int parallelism = runOptions.getParallelism() 
== -1 ? defaultParallelism : runOptions.getParallelism();
 
-                               final JobGraph jobGraph = 
createJobGraph(configuration, program, parallelism);
+                               final JobGraph jobGraph = 
PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
 
                                final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
                                client = clusterDescriptor.deployJobCluster(
@@ -1117,64 +1112,6 @@ public class CliFrontend {
                return customCommandLines;
        }
 
-       /**
-        * Creates a {@link JobGraph} from the given {@link PackagedProgram}.
-        *
-        * @param configuration to use for the optimizer and job graph generator
-        * @param packagedProgram to extract the JobGraph from
-        * @param defaultParallelism for the JobGraph
-        * @return JobGraph extracted from the PackagedProgram
-        * @throws ProgramInvocationException if the JobGraph generation failed
-        */
-       public static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
-               
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
-               final Optimizer optimizer = new Optimizer(new DataStatistics(), 
new DefaultCostEstimator(), configuration);
-               final FlinkPlan flinkPlan;
-
-               if (packagedProgram.isUsingProgramEntryPoint()) {
-
-                       final JobWithJars jobWithJars = 
packagedProgram.getPlanWithJars();
-
-                       final Plan plan = jobWithJars.getPlan();
-
-                       if (plan.getDefaultParallelism() <= 0) {
-                               plan.setDefaultParallelism(defaultParallelism);
-                       }
-
-                       flinkPlan = optimizer.compile(jobWithJars.getPlan());
-               } else if (packagedProgram.isUsingInteractiveMode()) {
-                       final OptimizerPlanEnvironment optimizerPlanEnvironment 
= new OptimizerPlanEnvironment(optimizer);
-
-                       
optimizerPlanEnvironment.setParallelism(defaultParallelism);
-
-                       flinkPlan = 
optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
-               } else {
-                       throw new ProgramInvocationException("PackagedProgram 
does not have a valid invocation mode.");
-               }
-
-               final JobGraph jobGraph;
-
-               if (flinkPlan instanceof StreamingPlan) {
-                       jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
-                       
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
-               } else {
-                       final JobGraphGenerator jobGraphGenerator = new 
JobGraphGenerator(configuration);
-                       jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
-               }
-
-               for (URL url : packagedProgram.getAllLibraries()) {
-                       try {
-                               jobGraph.addJar(new Path(url.toURI()));
-                       } catch (URISyntaxException e) {
-                               throw new ProgramInvocationException("Invalid 
URL for jar file: " + url + '.', e);
-                       }
-               }
-
-               jobGraph.setClasspaths(packagedProgram.getClasspaths());
-
-               return jobGraph;
-       }
-
        // 
--------------------------------------------------------------------------------------------
        //  Custom command-line
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
new file mode 100644
index 0000000..5765d1f
--- /dev/null
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.client.program;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.FlinkPlan;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.StreamingPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+import java.net.URISyntaxException;
+import java.net.URL;
+
+/**
+ * Utility class for {@link PackagedProgram} related operations.
+ */
+public class PackagedProgramUtils {
+
+       /**
+        * Creates a {@link JobGraph} from the given {@link PackagedProgram}.
+        *
+        * @param packagedProgram to extract the JobGraph from
+        * @param configuration to use for the optimizer and job graph generator
+        * @param defaultParallelism for the JobGraph
+        * @return JobGraph extracted from the PackagedProgram
+        * @throws ProgramInvocationException if the JobGraph generation failed
+        */
+       public static JobGraph createJobGraph(
+                       PackagedProgram packagedProgram,
+                       Configuration configuration,
+                       int defaultParallelism) throws 
ProgramInvocationException {
+               
Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
+               final Optimizer optimizer = new Optimizer(new DataStatistics(), 
new DefaultCostEstimator(), configuration);
+               final FlinkPlan flinkPlan;
+
+               if (packagedProgram.isUsingProgramEntryPoint()) {
+
+                       final JobWithJars jobWithJars = 
packagedProgram.getPlanWithJars();
+
+                       final Plan plan = jobWithJars.getPlan();
+
+                       if (plan.getDefaultParallelism() <= 0) {
+                               plan.setDefaultParallelism(defaultParallelism);
+                       }
+
+                       flinkPlan = optimizer.compile(jobWithJars.getPlan());
+               } else if (packagedProgram.isUsingInteractiveMode()) {
+                       final OptimizerPlanEnvironment optimizerPlanEnvironment 
= new OptimizerPlanEnvironment(optimizer);
+
+                       
optimizerPlanEnvironment.setParallelism(defaultParallelism);
+
+                       flinkPlan = 
optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
+               } else {
+                       throw new ProgramInvocationException("PackagedProgram 
does not have a valid invocation mode.");
+               }
+
+               final JobGraph jobGraph;
+
+               if (flinkPlan instanceof StreamingPlan) {
+                       jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
+                       
jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
+               } else {
+                       final JobGraphGenerator jobGraphGenerator = new 
JobGraphGenerator(configuration);
+                       jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
+               }
+
+               for (URL url : packagedProgram.getAllLibraries()) {
+                       try {
+                               jobGraph.addJar(new Path(url.toURI()));
+                       } catch (URISyntaxException e) {
+                               throw new ProgramInvocationException("Invalid 
URL for jar file: " + url + '.', e);
+                       }
+               }
+
+               jobGraph.setClasspaths(packagedProgram.getClasspaths());
+
+               return jobGraph;
+       }
+
+       private PackagedProgramUtils() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-core/src/main/java/org/apache/flink/util/AutoCloseableAsync.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AutoCloseableAsync.java 
b/flink-core/src/main/java/org/apache/flink/util/AutoCloseableAsync.java
new file mode 100644
index 0000000..ef576e9
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/util/AutoCloseableAsync.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Closeable interface which allows to close a resource in a non
+ * blocking fashion.
+ */
+public interface AutoCloseableAsync extends AutoCloseable {
+
+       /**
+        * Trigger the closing of the resource and return the corresponding
+        * close future.
+        *
+        * @return Future which is completed once the resource has been closed
+        */
+       CompletableFuture<Void> closeAsync();
+
+       default void close() throws Exception {
+               try {
+                       closeAsync().get();
+               } catch (ExecutionException e) {
+                       throw new FlinkException("Could not close resource.", 
ExceptionUtils.stripExecutionException(e));
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
new file mode 100644
index 0000000..bbb5f98
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebSubmissionExtension.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.runtime.webmonitor.handlers.JarListHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarListHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarRunHeaders;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadHandler;
+import org.apache.flink.runtime.webmonitor.handlers.JarUploadMessageHeaders;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Container for the web submission handlers.
+ */
+public class WebSubmissionExtension implements WebMonitorExtension {
+
+       private final ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> webSubmissionHandlers;
+
+       private final RestClusterClient<?> restClusterClient;
+
+       public WebSubmissionExtension(
+                       Configuration configuration,
+                       CompletableFuture<String> restAddressFuture,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Map<String, String> responseHeaders,
+                       Path jarDir,
+                       Executor executor,
+                       Time timeout) throws Exception {
+
+               restClusterClient = new RestClusterClient<>(configuration, 
"WebSubmissionHandlers");
+
+               webSubmissionHandlers = new ArrayList<>(3);
+
+               final JarUploadHandler jarUploadHandler = new JarUploadHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       JarUploadMessageHeaders.getInstance(),
+                       jarDir,
+                       executor);
+
+               final JarListHandler jarListHandler = new JarListHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       JarListHeaders.getInstance(),
+                       jarDir.toFile(),
+                       executor);
+
+               final JarRunHandler jarRunHandler = new JarRunHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       JarRunHeaders.getInstance(),
+                       jarDir,
+                       configuration,
+                       executor,
+                       restClusterClient);
+
+               
webSubmissionHandlers.add(Tuple2.of(JarUploadMessageHeaders.getInstance(), 
jarUploadHandler));
+               
webSubmissionHandlers.add(Tuple2.of(JarListHeaders.getInstance(), 
jarListHandler));
+               
webSubmissionHandlers.add(Tuple2.of(JarRunHeaders.getInstance(), 
jarRunHandler));
+       }
+
+       @Override
+       public CompletableFuture<Void> closeAsync() {
+               restClusterClient.shutdown();
+
+               return CompletableFuture.completedFuture(null);
+       }
+
+       @Override
+       public Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> getHandlers() {
+               return webSubmissionHandlers;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
index e771faf..b9036d9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHeaders.java
@@ -32,6 +32,10 @@ public class JarListHeaders implements 
MessageHeaders<EmptyRequestBody, JarListI
 
        public static final String URL = "/jars";
 
+       private static final JarListHeaders INSTANCE = new JarListHeaders();
+
+       private JarListHeaders() {}
+
        @Override
        public Class<EmptyRequestBody> getRequestClass() {
                return EmptyRequestBody.class;
@@ -62,4 +66,7 @@ public class JarListHeaders implements 
MessageHeaders<EmptyRequestBody, JarListI
                return URL;
        }
 
+       public static JarListHeaders getInstance() {
+               return INSTANCE;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 47ea4d3..bf2286f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -20,12 +20,11 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
@@ -34,6 +33,7 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -60,7 +60,7 @@ import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emp
  * Handler to submit jobs uploaded via the Web UI.
  */
 public class JarRunHandler extends
-               AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
+               AbstractRestHandler<RestfulGateway, EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
 
        private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
 
@@ -74,29 +74,26 @@ public class JarRunHandler extends
 
        public JarRunHandler(
                        final CompletableFuture<String> localRestAddress,
-                       final GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        final Time timeout,
                        final Map<String, String> responseHeaders,
                        final MessageHeaders<EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> messageHeaders,
                        final Path jarDir,
                        final Configuration configuration,
-                       final Executor executor) {
+                       final Executor executor,
+                       final RestClusterClient<?> restClusterClient) {
                super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
 
                this.jarDir = requireNonNull(jarDir);
                this.configuration = requireNonNull(configuration);
                this.executor = requireNonNull(executor);
-               try {
-                       this.restClusterClient = new 
RestClusterClient<>(configuration, "Unknown cluster id");
-               } catch (Exception e) {
-                       throw new RuntimeException(e);
-               }
+               this.restClusterClient = requireNonNull(restClusterClient);
        }
 
        @Override
        protected CompletableFuture<JarRunResponseBody> handleRequest(
                        @Nonnull final HandlerRequest<EmptyRequestBody, 
JarRunMessageParameters> request,
-                       @Nonnull final DispatcherGateway gateway) throws 
RestHandlerException {
+                       @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
 
                final String pathParameter = 
request.getPathParameter(JarIdPathParameter.class);
                final Path jarFile = jarDir.resolve(pathParameter);
@@ -160,7 +157,7 @@ public class JarRunHandler extends
                                        jarFile.toFile(),
                                        entryClass,
                                        programArgs.toArray(new 
String[programArgs.size()]));
-                               jobGraph = 
CliFrontend.createJobGraph(configuration, packagedProgram, parallelism);
+                               jobGraph = 
PackagedProgramUtils.createJobGraph(packagedProgram, configuration, 
parallelism);
                        } catch (final ProgramInvocationException e) {
                                throw new CompletionException(e);
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
index a6d7ecb..0ed035a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHeaders.java
@@ -28,6 +28,11 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
  * {@link MessageHeaders} for {@link JarRunHandler}.
  */
 public class JarRunHeaders implements MessageHeaders<EmptyRequestBody, 
JarRunResponseBody, JarRunMessageParameters> {
+
+       private static final JarRunHeaders INSTANCE = new JarRunHeaders();
+
+       private JarRunHeaders() {}
+
        @Override
        public Class<JarRunResponseBody> getResponseClass() {
                return JarRunResponseBody.class;
@@ -57,4 +62,8 @@ public class JarRunHeaders implements 
MessageHeaders<EmptyRequestBody, JarRunRes
        public String getTargetRestEndpointURL() {
                return "/jars/:" + JarIdPathParameter.KEY + "/run";
        }
+
+       public static JarRunHeaders getInstance() {
+               return INSTANCE;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 66ae03b..c2f16a7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -19,13 +19,13 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.dispatcher.DispatcherGateway;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.FileUpload;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
@@ -46,7 +46,7 @@ import static java.util.Objects.requireNonNull;
  * Handles .jar file uploads.
  */
 public class JarUploadHandler extends
-               AbstractRestHandler<DispatcherGateway, FileUpload, 
JarUploadResponseBody, EmptyMessageParameters> {
+               AbstractRestHandler<RestfulGateway, FileUpload, 
JarUploadResponseBody, EmptyMessageParameters> {
 
        private final Path jarDir;
 
@@ -54,7 +54,7 @@ public class JarUploadHandler extends
 
        public JarUploadHandler(
                        final CompletableFuture<String> localRestAddress,
-                       final GatewayRetriever<? extends DispatcherGateway> 
leaderRetriever,
+                       final GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        final Time timeout,
                        final Map<String, String> responseHeaders,
                        final MessageHeaders<FileUpload, JarUploadResponseBody, 
EmptyMessageParameters> messageHeaders,
@@ -68,7 +68,7 @@ public class JarUploadHandler extends
        @Override
        protected CompletableFuture<JarUploadResponseBody> handleRequest(
                        @Nonnull final HandlerRequest<FileUpload, 
EmptyMessageParameters> request,
-                       @Nonnull final DispatcherGateway gateway) throws 
RestHandlerException {
+                       @Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
 
                final FileUpload fileUpload = request.getRequestBody();
                return CompletableFuture.supplyAsync(() -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java
index 82c8d32..a0ec06b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadMessageHeaders.java
@@ -30,6 +30,10 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
  */
 public final class JarUploadMessageHeaders implements 
MessageHeaders<FileUpload, JarUploadResponseBody, EmptyMessageParameters> {
 
+       private static final JarUploadMessageHeaders INSTANCE = new 
JarUploadMessageHeaders();
+
+       private JarUploadMessageHeaders() {}
+
        @Override
        public Class<JarUploadResponseBody> getResponseClass() {
                return JarUploadResponseBody.class;
@@ -60,4 +64,7 @@ public final class JarUploadMessageHeaders implements 
MessageHeaders<FileUpload,
                return "/jars/upload";
        }
 
+       public static JarUploadMessageHeaders getInstance() {
+               return INSTANCE;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
index e62ac3e..c882ef1 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebMonitorUtilsTest.java
@@ -33,8 +33,9 @@ import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.not;
+import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
 
 /**
@@ -57,16 +58,18 @@ public class WebMonitorUtilsTest extends TestLogger {
         * Tests dynamically loading of handlers such as {@link 
JarUploadHandler}.
         */
        @Test
-       public void testTryLoadJarHandlers() {
+       public void testLoadWebSubmissionExtension() throws Exception {
                final Configuration configuration = new Configuration();
                configuration.setString(JobManagerOptions.ADDRESS, "localhost");
-               assertThat(WebMonitorUtils.tryLoadJarHandlers(
+               final WebMonitorExtension webMonitorExtension = 
WebMonitorUtils.loadWebSubmissionExtension(
                        CompletableFuture::new,
                        CompletableFuture.completedFuture("localhost:12345"),
                        Time.seconds(10),
                        Collections.emptyMap(),
                        Paths.get("/tmp"),
                        Executors.directExecutor(),
-                       configuration), not(empty()));
+                       configuration);
+
+               assertThat(webMonitorExtension, is(not(nullValue())));
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
index fd8224c..1eee275 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandlerTest.java
@@ -75,7 +75,7 @@ public class JarUploadHandlerTest extends TestLogger {
                        () -> 
CompletableFuture.completedFuture(mockDispatcherGateway),
                        Time.seconds(10),
                        Collections.emptyMap(),
-                       new JarUploadMessageHeaders(),
+                       JarUploadMessageHeaders.getInstance(),
                        jarDir,
                        Executors.directExecutor());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 10a6239..b5205ab 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -32,9 +32,12 @@ import 
org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
+import org.apache.flink.runtime.webmonitor.WebMonitorExtension;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.FlinkException;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
@@ -52,6 +55,8 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
 
        private final Path uploadDir;
 
+       private WebMonitorExtension webSubmissionExtension;
+
        public DispatcherRestEndpoint(
                        RestServerEndpointConfiguration endpointConfiguration,
                        GatewayRetriever<DispatcherGateway> leaderRetriever,
@@ -77,6 +82,7 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        fatalErrorHandler);
 
                uploadDir = endpointConfiguration.getUploadDir();
+               webSubmissionExtension = WebMonitorExtension.empty();
        }
 
        @Override
@@ -101,14 +107,23 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
                        responseHeaders);
 
                if (clusterConfiguration.getBoolean(WebOptions.SUBMIT_ENABLE)) {
-                       handlers.addAll(WebMonitorUtils.tryLoadJarHandlers(
-                               leaderRetriever,
-                               restAddressFuture,
-                               timeout,
-                               responseHeaders,
-                               uploadDir,
-                               executor,
-                               clusterConfiguration));
+                       try {
+                               webSubmissionExtension = 
WebMonitorUtils.loadWebSubmissionExtension(
+                                       leaderRetriever,
+                                       restAddressFuture,
+                                       timeout,
+                                       responseHeaders,
+                                       uploadDir,
+                                       executor,
+                                       clusterConfiguration);
+
+                               // register extension handlers
+                               
handlers.addAll(webSubmissionExtension.getHandlers());
+                       } catch (FlinkException e) {
+                               log.info("Failed to load web based job 
submission extension.", e);
+                       }
+               } else {
+                       log.info("Web-based job submission is not enabled.");
                }
 
                
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));
@@ -116,4 +131,28 @@ public class DispatcherRestEndpoint extends 
WebMonitorEndpoint<DispatcherGateway
 
                return handlers;
        }
+
+       @Override
+       protected CompletableFuture<Void> shutDownInternal() {
+               final CompletableFuture<Void> shutdownFuture = 
super.shutDownInternal();
+
+               final CompletableFuture<Void> shutdownResultFuture = new 
CompletableFuture<>();
+
+               shutdownFuture.whenComplete(
+                       (Void ignored, Throwable throwable) -> {
+                               
webSubmissionExtension.closeAsync().whenComplete(
+                                       (Void innerIgnored, Throwable 
innerThrowable) -> {
+                                               if (innerThrowable != null) {
+                                                       
shutdownResultFuture.completeExceptionally(
+                                                               
ExceptionUtils.firstOrSuppressed(innerThrowable, throwable));
+                                               } else if (throwable != null) {
+                                                       
shutdownResultFuture.completeExceptionally(throwable);
+                                               } else {
+                                                       
shutdownResultFuture.complete(null);
+                                               }
+                                       });
+                       });
+
+               return shutdownResultFuture;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorExtension.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorExtension.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorExtension.java
new file mode 100644
index 0000000..b32ac05
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorExtension.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import org.apache.flink.util.AutoCloseableAsync;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Interface for {@link WebMonitorEndpoint} extensions. Extensions can register
+ * additional handler and allow to close their resources in order to integrate
+ * into the life cycle of the {@link WebMonitorEndpoint}.
+ */
+public interface WebMonitorExtension extends AutoCloseableAsync {
+
+       /**
+        * Gets the collection of extension handlers to register at the {@link 
WebMonitorEndpoint}.
+        *
+        * @return Collection of handlers to register at the {@link 
WebMonitorEndpoint}.
+        */
+       Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> 
getHandlers();
+
+       static WebMonitorExtension empty() {
+               return EmptyWebMonitorExtension.INSTANCE;
+       }
+
+       /**
+        * Empty web monitor extension which can be used as a null object.
+        */
+       enum EmptyWebMonitorExtension implements WebMonitorExtension {
+               INSTANCE;
+
+               @Override
+               public Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> getHandlers() {
+                       return Collections.emptyList();
+               }
+
+               @Override
+               public CompletableFuture<Void> closeAsync() {
+                       return CompletableFuture.completedFuture(null);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ec752138/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index ff24533..24ecf0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.core.fs.Path;
@@ -33,18 +32,16 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
-import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.FlinkException;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ArrayNode;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -55,9 +52,6 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.URI;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
@@ -211,105 +205,55 @@ public final class WebMonitorUtils {
                }
        }
 
-       @SuppressWarnings({"unchecked", "rawtypes"})
-       public static <T extends RestfulGateway> 
Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> 
tryLoadJarHandlers(
-                       GatewayRetriever<T> leaderRetriever,
+       /**
+        * Loads the {@link WebMonitorExtension} which enables web submission.
+        *
+        * @param leaderRetriever to retrieve the leader
+        * @param restAddressFuture of the underlying REST server endpoint
+        * @param timeout for asynchronous requests
+        * @param responseHeaders for the web submission handlers
+        * @param uploadDir where the web submission handler store uploaded jars
+        * @param executor to run asynchronous operations
+        * @param configuration used to instantiate the web submission extension
+        * @return Web submission extension
+        * @throws FlinkException if the web submission extension could not be 
loaded
+        */
+       public static WebMonitorExtension loadWebSubmissionExtension(
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        CompletableFuture<String> restAddressFuture,
                        Time timeout,
                        Map<String, String> responseHeaders,
                        java.nio.file.Path uploadDir,
                        Executor executor,
-                       Configuration configuration) {
-
-               if (!isFlinkRuntimeWebInClassPath()) {
-                       return Collections.emptyList();
-               }
+                       Configuration configuration) throws FlinkException {
 
-               final String jarHandlerPackageName = 
"org.apache.flink.runtime.webmonitor.handlers.";
-               try {
-                       final Constructor<?> jarUploadHandlerConstrutor = Class
-                               .forName(jarHandlerPackageName + 
"JarUploadHandler")
-                               .getConstructor(
-                                       CompletableFuture.class,
-                                       GatewayRetriever.class,
-                                       Time.class,
-                                       Map.class,
-                                       MessageHeaders.class,
-                                       java.nio.file.Path.class,
-                                       Executor.class);
-
-                       final MessageHeaders jarUploadMessageHeaders = 
(MessageHeaders) Class
-                               .forName(jarHandlerPackageName + 
"JarUploadMessageHeaders")
-                               .newInstance();
-
-                       final ChannelInboundHandler jarUploadHandler = 
(ChannelInboundHandler) jarUploadHandlerConstrutor
-                               .newInstance(
-                                       restAddressFuture,
-                                       leaderRetriever,
-                                       timeout,
-                                       responseHeaders,
-                                       jarUploadMessageHeaders,
-                                       uploadDir,
-                                       executor);
-
-                       final Constructor<?> jarListHandlerConstructor = Class
-                               .forName(jarHandlerPackageName + 
"JarListHandler")
-                               .getConstructor(
-                                       CompletableFuture.class,
-                                       GatewayRetriever.class,
-                                       Time.class,
-                                       Map.class,
-                                       MessageHeaders.class,
-                                       File.class,
-                                       Executor.class);
-
-                       final MessageHeaders jarListHeaders = (MessageHeaders) 
Class
-                               .forName(jarHandlerPackageName + 
"JarListHeaders")
-                               .newInstance();
-
-                       final ChannelInboundHandler jarListHandler = 
(ChannelInboundHandler) jarListHandlerConstructor
-                               .newInstance(
-                                       restAddressFuture,
-                                       leaderRetriever,
-                                       timeout,
-                                       responseHeaders,
-                                       jarListHeaders,
-                                       uploadDir.toFile(),
-                                       executor);
-
-                       final Constructor<?> jarRunHandlerConstructor = Class
-                               .forName(jarHandlerPackageName + 
"JarRunHandler")
-                               .getConstructor(
-                                       CompletableFuture.class,
-                                       GatewayRetriever.class,
-                                       Time.class,
-                                       Map.class,
-                                       MessageHeaders.class,
-                                       java.nio.file.Path.class,
-                                       Configuration.class,
-                                       Executor.class);
-
-                       final MessageHeaders jarRunHandlerHeaders = 
(MessageHeaders) Class
-                               .forName(jarHandlerPackageName + 
"JarRunHeaders")
-                               .newInstance();
-
-                       final ChannelInboundHandler jarRunHandler = 
(ChannelInboundHandler) jarRunHandlerConstructor
-                               .newInstance(
+               if (isFlinkRuntimeWebInClassPath()) {
+                       try {
+                               final Constructor<?> 
webSubmissionExtensionConstructor = Class
+                                       
.forName("org.apache.flink.runtime.webmonitor.WebSubmissionExtension")
+                                       .getConstructor(
+                                               Configuration.class,
+                                               CompletableFuture.class,
+                                               GatewayRetriever.class,
+                                               Map.class,
+                                               java.nio.file.Path.class,
+                                               Executor.class,
+                                               Time.class);
+
+                               return (WebMonitorExtension) 
webSubmissionExtensionConstructor.newInstance(
+                                       configuration,
                                        restAddressFuture,
                                        leaderRetriever,
-                                       timeout,
                                        responseHeaders,
-                                       jarRunHandlerHeaders,
                                        uploadDir,
-                                       configuration,
-                                       executor);
-
-                       return Arrays.asList(
-                               Tuple2.of(jarUploadMessageHeaders, 
jarUploadHandler),
-                               Tuple2.of(jarListHeaders, jarListHandler),
-                               Tuple2.of(jarRunHandlerHeaders, jarRunHandler));
-               } catch (ClassNotFoundException | InvocationTargetException | 
InstantiationException | NoSuchMethodException | IllegalAccessException e) {
-                       throw new RuntimeException(e);
+                                       executor,
+                                       timeout);
+                       } catch (ClassNotFoundException | NoSuchMethodException 
| InstantiationException | InvocationTargetException | IllegalAccessException 
e) {
+                               throw new FlinkException("Could not load web 
submission extension.", e);
+                       }
+               } else {
+                       throw new FlinkException("The module flink-runtime-web 
could not be found in the class path. Please add " +
+                               "this jar in order to enable web based job 
submission.");
                }
        }
 

Reply via email to