Repository: flink
Updated Branches:
  refs/heads/master 2cb37cb93 -> 55b76d54f


[FLINK-7532] Add web content handler to DispatcherRestEndpoint

Adds the StaticFileContentHandler to the DispatcherRestEndpoint if the
flink-runtime-web dependency is in the classpath. In order to setup the
respective channel handler, this commit introduces the setupChannelHandlers
method to the RestServerEndpoint.

Refactor RestServerEndpoint#initializeHandler to support 
StaticFileServerHandler registration

This closes #4601.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55b76d54
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55b76d54
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55b76d54

Branch: refs/heads/master
Commit: 55b76d54f0dcc4bdaa96eaa463ce0bfcad23d239
Parents: 2cb37cb
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Aug 18 14:05:11 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 20 10:35:30 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      | 62 ++++++++++++++++++--
 .../entrypoint/SessionClusterEntrypoint.java    | 53 ++++++++++++++++-
 .../flink/runtime/rest/RestServerEndpoint.java  | 29 +++++----
 .../rest/handler/RestHandlerSpecification.java  | 41 +++++++++++++
 .../files/WebContentHandlerSpecification.java   | 46 +++++++++++++++
 .../runtime/rest/messages/MessageHeaders.java   | 18 +-----
 .../runtime/webmonitor/WebMonitorUtils.java     | 39 ++++++++++++
 .../flink/runtime/rest/RestEndpointITCase.java  |  7 ++-
 8 files changed, 260 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 63b1a4c..debd674 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -18,24 +18,78 @@
 
 package org.apache.flink.runtime.dispatcher;
 
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.RestServerEndpoint;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
+
+import java.io.File;
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * REST endpoint for the {@link Dispatcher} component.
  */
 public class DispatcherRestEndpoint extends RestServerEndpoint {
 
-       public DispatcherRestEndpoint(RestServerEndpointConfiguration 
configuration) {
+       private final GatewayRetriever<DispatcherGateway> leaderRetriever;
+       private final Time timeout;
+       private final File tmpDir;
+
+       public DispatcherRestEndpoint(
+                       RestServerEndpointConfiguration configuration,
+                       GatewayRetriever<DispatcherGateway> leaderRetriever,
+                       Time timeout,
+                       File tmpDir) {
                super(configuration);
+               this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
+               this.timeout = Preconditions.checkNotNull(timeout);
+               this.tmpDir = Preconditions.checkNotNull(tmpDir);
+       }
+
+       @Override
+       protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture) {
+               Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
+
+               try {
+                       optWebContent = WebMonitorUtils.tryLoadWebContent(
+                               leaderRetriever,
+                               restAddressFuture,
+                               timeout,
+                               tmpDir);
+               } catch (IOException e) {
+                       log.warn("Could not load web content handler.", e);
+                       optWebContent = Optional.empty();
+               }
+
+               return optWebContent
+                       .map(webContent ->
+                               Collections.singleton(
+                                       Tuple2.<RestHandlerSpecification, 
ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), 
webContent)))
+                       .orElseGet(() -> Collections.emptySet());
        }
 
        @Override
-       protected Collection<AbstractRestHandler<?, ?, ?, ?>> 
initializeHandlers() {
-               return Collections.emptySet();
+       public void shutdown(Time timeout) {
+               super.shutdown(timeout);
+
+               try {
+                       log.info("Removing cache directory {}", tmpDir);
+                       FileUtils.deleteDirectory(tmpDir);
+               } catch (Throwable t) {
+                       log.warn("Error while deleting cache directory {}", 
tmpDir, t);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
index 0d14443..80bd384 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java
@@ -18,22 +18,30 @@
 
 package org.apache.flink.runtime.entrypoint;
 
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.dispatcher.Dispatcher;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.dispatcher.DispatcherId;
 import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
 import org.apache.flink.runtime.dispatcher.StandaloneDispatcher;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManager;
 import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
+import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import java.io.File;
 import java.util.Optional;
 
 /**
@@ -45,6 +53,8 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
 
        private Dispatcher dispatcher;
 
+       private LeaderRetrievalService dispatcherLeaderRetrievalService;
+
        private DispatcherRestEndpoint dispatcherRestEndpoint;
 
        public SessionClusterEntrypoint(Configuration configuration) {
@@ -60,8 +70,18 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                        HeartbeatServices heartbeatServices,
                        MetricRegistry metricRegistry) throws Exception {
 
-               dispatcherRestEndpoint = new DispatcherRestEndpoint(
-                       
RestServerEndpointConfiguration.fromConfiguration(configuration));
+               dispatcherLeaderRetrievalService = 
highAvailabilityServices.getDispatcherLeaderRetriever();
+
+               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever = new RpcGatewayRetriever<>(
+                       rpcService,
+                       DispatcherGateway.class,
+                       uuid -> new DispatcherId(uuid),
+                       10,
+                       Time.milliseconds(50L));
+
+               dispatcherRestEndpoint = createDispatcherRestEndpoint(
+                       configuration,
+                       dispatcherGatewayRetriever);
 
                LOG.debug("Starting Dispatcher REST endpoint.");
                dispatcherRestEndpoint.start();
@@ -90,17 +110,30 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
 
                LOG.debug("Starting Dispatcher.");
                dispatcher.start();
+               
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);
        }
 
        @Override
        protected void stopClusterComponents(boolean cleanupHaData) throws 
Exception {
                Throwable exception = null;
 
+               if (dispatcherRestEndpoint != null) {
+                       dispatcherRestEndpoint.shutdown(Time.seconds(10L));
+               }
+
+               if (dispatcherLeaderRetrievalService != null) {
+                       try {
+                               dispatcherLeaderRetrievalService.stop();
+                       } catch (Throwable t) {
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
+                       }
+               }
+
                if (dispatcher != null) {
                        try {
                                dispatcher.shutDown();
                        } catch (Throwable t) {
-                               exception = t;
+                               exception = ExceptionUtils.firstOrSuppressed(t, 
exception);
                        }
                }
 
@@ -117,6 +150,20 @@ public abstract class SessionClusterEntrypoint extends 
ClusterEntrypoint {
                }
        }
 
+       protected DispatcherRestEndpoint createDispatcherRestEndpoint(
+               Configuration configuration,
+               LeaderGatewayRetriever<DispatcherGateway> 
dispatcherGatewayRetriever) throws Exception {
+
+               Time timeout = 
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT));
+               File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR));
+
+               return new DispatcherRestEndpoint(
+                       
RestServerEndpointConfiguration.fromConfiguration(configuration),
+                       dispatcherGatewayRetriever,
+                       timeout,
+                       tmpDir);
+       }
+
        protected Dispatcher createDispatcher(
                Configuration configuration,
                RpcService rpcService,

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
index fc37381..ec6e5b0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java
@@ -19,16 +19,16 @@
 package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.RouterHandler;
-import org.apache.flink.runtime.rest.messages.RequestBody;
-import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer;
 import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup;
 import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
@@ -82,13 +82,18 @@ public abstract class RestServerEndpoint {
        /**
         * This method is called at the beginning of {@link #start()} to setup 
all handlers that the REST server endpoint
         * implementation requires.
+        *
+        * @param restAddressFuture future rest address of the 
RestServerEndpoint
+        * @return Collection of AbstractRestHandler which are added to the 
server endpoint
         */
-       protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> 
initializeHandlers();
+       protected abstract Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture);
 
        /**
         * Starts this REST server endpoint.
+        *
+        * @throws Exception if we cannot start the RestServerEndpoint
         */
-       public void start() {
+       public void start() throws Exception {
                synchronized (lock) {
                        if (started) {
                                // RestServerEndpoint already started
@@ -98,8 +103,9 @@ public abstract class RestServerEndpoint {
                        log.info("Starting rest endpoint.");
 
                        final Router router = new Router();
+                       final CompletableFuture<String> restAddressFuture = new 
CompletableFuture<>();
 
-                       initializeHandlers().forEach(handler -> 
registerHandler(router, handler));
+                       initializeHandlers(restAddressFuture).forEach(handler 
-> registerHandler(router, handler));
 
                        ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
 
@@ -150,8 +156,11 @@ public abstract class RestServerEndpoint {
                        } else {
                                protocol = "http://";;
                        }
+
                        restAddress = protocol + address + ':' + port;
 
+                       restAddressFuture.complete(restAddress);
+
                        started = true;
                }
        }
@@ -239,13 +248,13 @@ public abstract class RestServerEndpoint {
                }
        }
 
-       private static <R extends RequestBody, P extends ResponseBody> void 
registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) {
-               switch (handler.getMessageHeaders().getHttpMethod()) {
+       private static void registerHandler(Router router, 
Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) {
+               switch (specificationHandler.f0.getHttpMethod()) {
                        case GET:
-                               
router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+                               
router.GET(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
                                break;
                        case POST:
-                               
router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler);
+                               
router.POST(specificationHandler.f0.getTargetRestEndpointURL(), 
specificationHandler.f1);
                                break;
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
new file mode 100644
index 0000000..4ebcd49
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+
+/**
+ * Rest handler interface which all rest handler implementation have to 
implement.
+ */
+public interface RestHandlerSpecification {
+
+       /**
+        * Returns the {@link HttpMethodWrapper} to be used for the request.
+        *
+        * @return http method to be used for the request
+        */
+       HttpMethodWrapper getHttpMethod();
+
+       /**
+        * Returns the generalized endpoint url that this request should be 
sent to, for example {@code /job/:jobid}.
+        *
+        * @return endpoint url that this request should be sent to
+        */
+       String getTargetRestEndpointURL();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
new file mode 100644
index 0000000..98b805a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.files;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
+
+/**
+ * Rest handler specification for the web content handler.
+ */
+public final class WebContentHandlerSpecification implements 
RestHandlerSpecification {
+
+       private static final WebContentHandlerSpecification INSTANCE = new 
WebContentHandlerSpecification();
+
+       private WebContentHandlerSpecification() {}
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return "/:*";
+       }
+
+       public static WebContentHandlerSpecification getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
index 254c231..e5ec794 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages;
 
-import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
@@ -31,7 +31,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
  * @param <P> response message type
  * @param <M> message parameters type
  */
-public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, 
M extends MessageParameters> {
+public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, 
M extends MessageParameters> extends RestHandlerSpecification {
 
        /**
         * Returns the class of the request message.
@@ -41,20 +41,6 @@ public interface MessageHeaders<R extends RequestBody, P 
extends ResponseBody, M
        Class<R> getRequestClass();
 
        /**
-        * Returns the {@link HttpMethodWrapper} to be used for the request.
-        *
-        * @return http method to be used for the request
-        */
-       HttpMethodWrapper getHttpMethod();
-
-       /**
-        * Returns the generalized endpoint url that this request should be 
sent to, for example {@code /job/:jobid}.
-        *
-        * @return endpoint url that this request should be sent to
-        */
-       String getTargetRestEndpointURL();
-
-       /**
         * Returns the class of the response message.
         *
         * @return class of the response message

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
index 57996bd..e0f1823 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java
@@ -32,7 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
 import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
 
@@ -43,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
+import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
@@ -50,6 +53,8 @@ import java.net.URI;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
@@ -170,6 +175,40 @@ public final class WebMonitorUtils {
                }
        }
 
+       /**
+        * Checks whether the flink-runtime-web dependency is available and if 
so returns a
+        * StaticFileServerHandler which can serve the static file contents.
+        *
+        * @param leaderRetriever to be used by the StaticFileServerHandler
+        * @param restAddressFuture of the underlying REST server endpoint
+        * @param timeout for lookup requests
+        * @param tmpDir to be used by the StaticFileServerHandler to store 
temporary files
+        * @param <T> type of the gateway to retrieve
+        * @return StaticFileServerHandler if flink-runtime-web is in the 
classpath; Otherwise Optional.empty
+        * @throws IOException if we cannot create the StaticFileServerHandler
+        */
+       public static <T extends RestfulGateway> 
Optional<StaticFileServerHandler<T>> tryLoadWebContent(
+                       GatewayRetriever<T> leaderRetriever,
+                       CompletableFuture<String> restAddressFuture,
+                       Time timeout,
+                       File tmpDir) throws IOException {
+
+               // 1. Check if flink-runtime-web is in the classpath
+               try {
+                       final String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";
+                       Class.forName(classname).asSubclass(WebMonitor.class);
+
+                       return Optional.of(new StaticFileServerHandler<>(
+                               leaderRetriever,
+                               restAddressFuture,
+                               timeout,
+                               tmpDir));
+               } catch (ClassNotFoundException ignored) {
+                       // class not found means that there is no 
flink-runtime-web in the classpath
+                       return Optional.empty();
+               }
+       }
+
        public static JsonArchivist[] getJsonArchivists() {
                try {
                        String classname = 
"org.apache.flink.runtime.webmonitor.WebRuntimeMonitor";

http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 4e06d1f..8dfb5ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -20,10 +20,12 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.MessageParameters;
 import org.apache.flink.runtime.rest.messages.MessagePathParameter;
@@ -37,6 +39,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
@@ -137,8 +140,8 @@ public class RestEndpointITCase extends TestLogger {
                }
 
                @Override
-               protected Collection<AbstractRestHandler<?, ?, ?, ?>> 
initializeHandlers() {
-                       return Collections.singleton(testHandler);
+               protected Collection<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> 
restAddressFuture) {
+                       return Collections.singleton(Tuple2.of(new 
TestHeaders(), testHandler));
                }
        }
 

Reply via email to