http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index f8245c2..10a6239 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -107,7 +107,8 @@ public class DispatcherRestEndpoint extends WebMonitorEndpoint<DispatcherGateway timeout, responseHeaders, uploadDir, - executor)); + executor, + clusterConfiguration)); } handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler));
http://git-wip-us.apache.org/repos/asf/flink/blob/ab8e9bdb/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 832b2e5..ff24533 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -218,7 +218,8 @@ public final class WebMonitorUtils { Time timeout, Map<String, String> responseHeaders, java.nio.file.Path uploadDir, - Executor executor) { + Executor executor, + Configuration configuration) { if (!isFlinkRuntimeWebInClassPath()) { return Collections.emptyList(); @@ -276,9 +277,37 @@ public final class WebMonitorUtils { uploadDir.toFile(), executor); + final Constructor<?> jarRunHandlerConstructor = Class + .forName(jarHandlerPackageName + "JarRunHandler") + .getConstructor( + CompletableFuture.class, + GatewayRetriever.class, + Time.class, + Map.class, + MessageHeaders.class, + java.nio.file.Path.class, + Configuration.class, + Executor.class); + + final MessageHeaders jarRunHandlerHeaders = (MessageHeaders) Class + .forName(jarHandlerPackageName + "JarRunHeaders") + .newInstance(); + + final ChannelInboundHandler jarRunHandler = (ChannelInboundHandler) jarRunHandlerConstructor + .newInstance( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders, + jarRunHandlerHeaders, + uploadDir, + configuration, + executor); + return Arrays.asList( Tuple2.of(jarUploadMessageHeaders, jarUploadHandler), - Tuple2.of(jarListHeaders, jarListHandler)); + Tuple2.of(jarListHeaders, jarListHandler), + Tuple2.of(jarRunHandlerHeaders, jarRunHandler)); } catch (ClassNotFoundException | InvocationTargetException | InstantiationException | NoSuchMethodException | IllegalAccessException e) { throw new RuntimeException(e); }