Repository: flink Updated Branches: refs/heads/master 2cb37cb93 -> 55b76d54f
[FLINK-7532] Add web content handler to DispatcherRestEndpoint Adds the StaticFileContentHandler to the DispatcherRestEndpoint if the flink-runtime-web dependency is in the classpath. In order to setup the respective channel handler, this commit introduces the setupChannelHandlers method to the RestServerEndpoint. Refactor RestServerEndpoint#initializeHandler to support StaticFileServerHandler registration This closes #4601. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/55b76d54 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/55b76d54 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/55b76d54 Branch: refs/heads/master Commit: 55b76d54f0dcc4bdaa96eaa463ce0bfcad23d239 Parents: 2cb37cb Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Aug 18 14:05:11 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Wed Sep 20 10:35:30 2017 +0200 ---------------------------------------------------------------------- .../dispatcher/DispatcherRestEndpoint.java | 62 ++++++++++++++++++-- .../entrypoint/SessionClusterEntrypoint.java | 53 ++++++++++++++++- .../flink/runtime/rest/RestServerEndpoint.java | 29 +++++---- .../rest/handler/RestHandlerSpecification.java | 41 +++++++++++++ .../files/WebContentHandlerSpecification.java | 46 +++++++++++++++ .../runtime/rest/messages/MessageHeaders.java | 18 +----- .../runtime/webmonitor/WebMonitorUtils.java | 39 ++++++++++++ .../flink/runtime/rest/RestEndpointITCase.java | 7 ++- 8 files changed, 260 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 63b1a4c..debd674 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -18,24 +18,78 @@ package org.apache.flink.runtime.dispatcher; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rest.RestServerEndpoint; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; -import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; +import org.apache.flink.runtime.rest.handler.legacy.files.WebContentHandlerSpecification; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; + +import java.io.File; +import java.io.IOException; import java.util.Collection; import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; /** * REST endpoint for the {@link Dispatcher} component. */ public class DispatcherRestEndpoint extends RestServerEndpoint { - public DispatcherRestEndpoint(RestServerEndpointConfiguration configuration) { + private final GatewayRetriever<DispatcherGateway> leaderRetriever; + private final Time timeout; + private final File tmpDir; + + public DispatcherRestEndpoint( + RestServerEndpointConfiguration configuration, + GatewayRetriever<DispatcherGateway> leaderRetriever, + Time timeout, + File tmpDir) { super(configuration); + this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); + this.timeout = Preconditions.checkNotNull(timeout); + this.tmpDir = Preconditions.checkNotNull(tmpDir); + } + + @Override + protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { + Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; + + try { + optWebContent = WebMonitorUtils.tryLoadWebContent( + leaderRetriever, + restAddressFuture, + timeout, + tmpDir); + } catch (IOException e) { + log.warn("Could not load web content handler.", e); + optWebContent = Optional.empty(); + } + + return optWebContent + .map(webContent -> + Collections.singleton( + Tuple2.<RestHandlerSpecification, ChannelInboundHandler>of(WebContentHandlerSpecification.getInstance(), webContent))) + .orElseGet(() -> Collections.emptySet()); } @Override - protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() { - return Collections.emptySet(); + public void shutdown(Time timeout) { + super.shutdown(timeout); + + try { + log.info("Removing cache directory {}", tmpDir); + FileUtils.deleteDirectory(tmpDir); + } catch (Throwable t) { + log.warn("Error while deleting cache directory {}", tmpDir, t); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java index 0d14443..80bd384 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/entrypoint/SessionClusterEntrypoint.java @@ -18,22 +18,30 @@ package org.apache.flink.runtime.entrypoint; +import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.dispatcher.Dispatcher; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.dispatcher.DispatcherId; import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint; import org.apache.flink.runtime.dispatcher.StandaloneDispatcher; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.resourcemanager.ResourceManager; import org.apache.flink.runtime.rest.RestServerEndpointConfiguration; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import java.io.File; import java.util.Optional; /** @@ -45,6 +53,8 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { private Dispatcher dispatcher; + private LeaderRetrievalService dispatcherLeaderRetrievalService; + private DispatcherRestEndpoint dispatcherRestEndpoint; public SessionClusterEntrypoint(Configuration configuration) { @@ -60,8 +70,18 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { HeartbeatServices heartbeatServices, MetricRegistry metricRegistry) throws Exception { - dispatcherRestEndpoint = new DispatcherRestEndpoint( - RestServerEndpointConfiguration.fromConfiguration(configuration)); + dispatcherLeaderRetrievalService = highAvailabilityServices.getDispatcherLeaderRetriever(); + + LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever = new RpcGatewayRetriever<>( + rpcService, + DispatcherGateway.class, + uuid -> new DispatcherId(uuid), + 10, + Time.milliseconds(50L)); + + dispatcherRestEndpoint = createDispatcherRestEndpoint( + configuration, + dispatcherGatewayRetriever); LOG.debug("Starting Dispatcher REST endpoint."); dispatcherRestEndpoint.start(); @@ -90,17 +110,30 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { LOG.debug("Starting Dispatcher."); dispatcher.start(); + dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever); } @Override protected void stopClusterComponents(boolean cleanupHaData) throws Exception { Throwable exception = null; + if (dispatcherRestEndpoint != null) { + dispatcherRestEndpoint.shutdown(Time.seconds(10L)); + } + + if (dispatcherLeaderRetrievalService != null) { + try { + dispatcherLeaderRetrievalService.stop(); + } catch (Throwable t) { + exception = ExceptionUtils.firstOrSuppressed(t, exception); + } + } + if (dispatcher != null) { try { dispatcher.shutDown(); } catch (Throwable t) { - exception = t; + exception = ExceptionUtils.firstOrSuppressed(t, exception); } } @@ -117,6 +150,20 @@ public abstract class SessionClusterEntrypoint extends ClusterEntrypoint { } } + protected DispatcherRestEndpoint createDispatcherRestEndpoint( + Configuration configuration, + LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever) throws Exception { + + Time timeout = Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT)); + File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR)); + + return new DispatcherRestEndpoint( + RestServerEndpointConfiguration.fromConfiguration(configuration), + dispatcherGatewayRetriever, + timeout, + tmpDir); + } + protected Dispatcher createDispatcher( Configuration configuration, RpcService rpcService, http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java index fc37381..ec6e5b0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestServerEndpoint.java @@ -19,16 +19,16 @@ package org.apache.flink.runtime.rest; import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.rest.handler.PipelineErrorHandler; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.handler.RouterHandler; -import org.apache.flink.runtime.rest.messages.RequestBody; -import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.bootstrap.ServerBootstrap; import org.apache.flink.shaded.netty4.io.netty.channel.Channel; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInitializer; import org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoopGroup; import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel; @@ -82,13 +82,18 @@ public abstract class RestServerEndpoint { /** * This method is called at the beginning of {@link #start()} to setup all handlers that the REST server endpoint * implementation requires. + * + * @param restAddressFuture future rest address of the RestServerEndpoint + * @return Collection of AbstractRestHandler which are added to the server endpoint */ - protected abstract Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers(); + protected abstract Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture); /** * Starts this REST server endpoint. + * + * @throws Exception if we cannot start the RestServerEndpoint */ - public void start() { + public void start() throws Exception { synchronized (lock) { if (started) { // RestServerEndpoint already started @@ -98,8 +103,9 @@ public abstract class RestServerEndpoint { log.info("Starting rest endpoint."); final Router router = new Router(); + final CompletableFuture<String> restAddressFuture = new CompletableFuture<>(); - initializeHandlers().forEach(handler -> registerHandler(router, handler)); + initializeHandlers(restAddressFuture).forEach(handler -> registerHandler(router, handler)); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @@ -150,8 +156,11 @@ public abstract class RestServerEndpoint { } else { protocol = "http://"; } + restAddress = protocol + address + ':' + port; + restAddressFuture.complete(restAddress); + started = true; } } @@ -239,13 +248,13 @@ public abstract class RestServerEndpoint { } } - private static <R extends RequestBody, P extends ResponseBody> void registerHandler(Router router, AbstractRestHandler<?, R, P, ?> handler) { - switch (handler.getMessageHeaders().getHttpMethod()) { + private static void registerHandler(Router router, Tuple2<RestHandlerSpecification, ChannelInboundHandler> specificationHandler) { + switch (specificationHandler.f0.getHttpMethod()) { case GET: - router.GET(handler.getMessageHeaders().getTargetRestEndpointURL(), handler); + router.GET(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1); break; case POST: - router.POST(handler.getMessageHeaders().getTargetRestEndpointURL(), handler); + router.POST(specificationHandler.f0.getTargetRestEndpointURL(), specificationHandler.f1); break; } } http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java new file mode 100644 index 0000000..4ebcd49 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerSpecification.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; + +/** + * Rest handler interface which all rest handler implementation have to implement. + */ +public interface RestHandlerSpecification { + + /** + * Returns the {@link HttpMethodWrapper} to be used for the request. + * + * @return http method to be used for the request + */ + HttpMethodWrapper getHttpMethod(); + + /** + * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}. + * + * @return endpoint url that this request should be sent to + */ + String getTargetRestEndpointURL(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java new file mode 100644 index 0000000..98b805a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/WebContentHandlerSpecification.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.files; + +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; + +/** + * Rest handler specification for the web content handler. + */ +public final class WebContentHandlerSpecification implements RestHandlerSpecification { + + private static final WebContentHandlerSpecification INSTANCE = new WebContentHandlerSpecification(); + + private WebContentHandlerSpecification() {} + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/:*"; + } + + public static WebContentHandlerSpecification getInstance() { + return INSTANCE; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java index 254c231..e5ec794 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java @@ -18,7 +18,7 @@ package org.apache.flink.runtime.rest.messages; -import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -31,7 +31,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt * @param <P> response message type * @param <M> message parameters type */ -public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> { +public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M extends MessageParameters> extends RestHandlerSpecification { /** * Returns the class of the request message. @@ -41,20 +41,6 @@ public interface MessageHeaders<R extends RequestBody, P extends ResponseBody, M Class<R> getRequestClass(); /** - * Returns the {@link HttpMethodWrapper} to be used for the request. - * - * @return http method to be used for the request - */ - HttpMethodWrapper getHttpMethod(); - - /** - * Returns the generalized endpoint url that this request should be sent to, for example {@code /job/:jobid}. - * - * @return endpoint url that this request should be sent to - */ - String getTargetRestEndpointURL(); - - /** * Returns the class of the response message. * * @return class of the response message http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 57996bd..e0f1823 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -32,7 +32,9 @@ import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever; import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; @@ -43,6 +45,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -50,6 +53,8 @@ import java.net.URI; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; /** @@ -170,6 +175,40 @@ public final class WebMonitorUtils { } } + /** + * Checks whether the flink-runtime-web dependency is available and if so returns a + * StaticFileServerHandler which can serve the static file contents. + * + * @param leaderRetriever to be used by the StaticFileServerHandler + * @param restAddressFuture of the underlying REST server endpoint + * @param timeout for lookup requests + * @param tmpDir to be used by the StaticFileServerHandler to store temporary files + * @param <T> type of the gateway to retrieve + * @return StaticFileServerHandler if flink-runtime-web is in the classpath; Otherwise Optional.empty + * @throws IOException if we cannot create the StaticFileServerHandler + */ + public static <T extends RestfulGateway> Optional<StaticFileServerHandler<T>> tryLoadWebContent( + GatewayRetriever<T> leaderRetriever, + CompletableFuture<String> restAddressFuture, + Time timeout, + File tmpDir) throws IOException { + + // 1. Check if flink-runtime-web is in the classpath + try { + final String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; + Class.forName(classname).asSubclass(WebMonitor.class); + + return Optional.of(new StaticFileServerHandler<>( + leaderRetriever, + restAddressFuture, + timeout, + tmpDir)); + } catch (ClassNotFoundException ignored) { + // class not found means that there is no flink-runtime-web in the classpath + return Optional.empty(); + } + } + public static JsonArchivist[] getJsonArchivists() { try { String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; http://git-wip-us.apache.org/repos/asf/flink/blob/55b76d54/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java index 4e06d1f..8dfb5ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java @@ -20,10 +20,12 @@ package org.apache.flink.runtime.rest; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.MessageParameters; import org.apache.flink.runtime.rest.messages.MessagePathParameter; @@ -37,6 +39,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import com.fasterxml.jackson.annotation.JsonCreator; @@ -137,8 +140,8 @@ public class RestEndpointITCase extends TestLogger { } @Override - protected Collection<AbstractRestHandler<?, ?, ?, ?>> initializeHandlers() { - return Collections.singleton(testHandler); + protected Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(CompletableFuture<String> restAddressFuture) { + return Collections.singleton(Tuple2.of(new TestHeaders(), testHandler)); } }