[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler - Let RequestHandler return a generic HttpResponse instead of a String. This enables handlers to return custom reponses (differnt http codes, etc.) - Introduce AbstractJsonRequestHandler for default JSON responses
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb60091 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb60091 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb60091 Branch: refs/heads/master Commit: 2fb600916860acf2256464659ca60424bbf26857 Parents: e9b20ec Author: Ufuk Celebi <[email protected]> Authored: Tue Oct 11 10:08:14 2016 +0200 Committer: Ufuk Celebi <[email protected]> Committed: Fri Oct 28 11:04:12 2016 +0200 ---------------------------------------------------------------------- .../webmonitor/RuntimeMonitorHandler.java | 23 +++--- .../AbstractExecutionGraphRequestHandler.java | 4 +- .../handlers/AbstractJsonRequestHandler.java | 73 ++++++++++++++++++++ .../handlers/ClusterOverviewHandler.java | 4 +- .../handlers/CurrentJobIdsHandler.java | 5 +- .../handlers/CurrentJobsOverviewHandler.java | 4 +- .../handlers/DashboardConfigHandler.java | 4 +- .../handlers/JarAccessDeniedHandler.java | 4 +- .../webmonitor/handlers/JarActionHandler.java | 2 +- .../webmonitor/handlers/JarDeleteHandler.java | 4 +- .../webmonitor/handlers/JarListHandler.java | 4 +- .../webmonitor/handlers/JarPlanHandler.java | 2 +- .../webmonitor/handlers/JarRunHandler.java | 2 +- .../webmonitor/handlers/JarUploadHandler.java | 4 +- .../handlers/JobCancellationHandler.java | 4 +- .../handlers/JobManagerConfigHandler.java | 4 +- .../webmonitor/handlers/JobStoppingHandler.java | 4 +- .../webmonitor/handlers/RequestHandler.java | 16 +++-- .../handlers/TaskManagersHandler.java | 5 +- .../metrics/AbstractMetricsHandler.java | 6 +- .../metrics/AbstractMetricsHandlerTest.java | 6 +- 21 files changed, 130 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index 5008a8c..aba4e17 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; import io.netty.handler.codec.http.HttpHeaders; import io.netty.handler.codec.http.HttpResponseStatus; import io.netty.handler.codec.http.HttpVersion; @@ -61,7 +62,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { public static final String WEB_MONITOR_ADDRESS_KEY = "web.monitor.address"; - private final RequestHandler handler; + private final RequestHandler handler; public RuntimeMonitorHandler( RequestHandler handler, @@ -75,7 +76,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { @Override protected void respondAsLeader(ChannelHandlerContext ctx, Routed routed, ActorGateway jobManager) { - DefaultFullHttpResponse response; + FullHttpResponse response; try { // we only pass the first element in the list to the handlers. @@ -93,14 +94,7 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { queryParams.put(WEB_MONITOR_ADDRESS_KEY, (httpsEnabled ? "https://" : "http://") + address.getHostName() + ":" + address.getPort()); - String result = handler.handleRequest(pathParams, queryParams, jobManager); - byte[] bytes = result.getBytes(ENCODING); - - response = new DefaultFullHttpResponse( - HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes)); - - response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); - response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response = handler.handleRequest(pathParams, queryParams, jobManager); } catch (NotFoundException e) { // this should result in a 404 error code (not found) @@ -108,6 +102,8 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { : Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING)); response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, message); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); + response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); LOG.debug("Error while handling request", e); } catch (Exception e) { @@ -115,11 +111,14 @@ public class RuntimeMonitorHandler extends RuntimeMonitorHandlerBase { response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes)); response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "text/plain"); + response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + LOG.debug("Error while handling request", e); } - response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8"); - response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*"); + KeepAliveWrite.flush(ctx, routed.request(), response); } } http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java index ff28d4e..8cd70e9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java @@ -30,7 +30,7 @@ import java.util.Map; * Base class for request handlers whose response depends on an ExecutionGraph * that can be retrieved via "jobid" parameter. */ -public abstract class AbstractExecutionGraphRequestHandler implements RequestHandler { +public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler { private final ExecutionGraphHolder executionGraphHolder; @@ -39,7 +39,7 @@ public abstract class AbstractExecutionGraphRequestHandler implements RequestHan } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { String jidString = pathParams.get("jobid"); if (jidString == null) { throw new RuntimeException("JobId parameter missing"); http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java new file mode 100644 index 0000000..ae163cb --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import io.netty.buffer.Unpooled; +import io.netty.handler.codec.http.DefaultFullHttpResponse; +import io.netty.handler.codec.http.FullHttpResponse; +import io.netty.handler.codec.http.HttpHeaders; +import io.netty.handler.codec.http.HttpResponseStatus; +import io.netty.handler.codec.http.HttpVersion; +import org.apache.flink.runtime.instance.ActorGateway; + +import java.nio.charset.Charset; +import java.util.Map; + +/** + * Base class for most request handlers. The handlers must produce a JSON response. + */ +public abstract class AbstractJsonRequestHandler implements RequestHandler { + + private static final Charset ENCODING = Charset.forName("UTF-8"); + + @Override + public FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + String result = handleJsonRequest(pathParams, queryParams, jobManager); + byte[] bytes = result.getBytes(ENCODING); + + DefaultFullHttpResponse response = new DefaultFullHttpResponse( + HttpVersion.HTTP_1_1, HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes)); + + response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json"); + response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, response.content().readableBytes()); + + return response; + } + + /** + * Core method that handles the request and generates the response. The method needs to + * respond with a valid JSON string. Exceptions may be thrown and will be handled. + * + * @param pathParams The map of REST path parameters, decoded by the router. + * @param queryParams The map of query parameters. + * @param jobManager The JobManager actor. + * + * @return The JSON string that is the HTTP response. + * + * @throws Exception Handlers may forward exceptions. Exceptions of type + * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404 + * response with the exception message, other exceptions will cause a HTTP 500 response + * with the exception stack trace. + */ + public abstract String handleJsonRequest( + Map<String, String> pathParams, + Map<String, String> queryParams, + ActorGateway jobManager) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java index b7389c4..99ef3d9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java @@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * Responder that returns the status of the Flink cluster, such as how many * TaskManagers are currently connected, and how many jobs are running. */ -public class ClusterOverviewHandler implements RequestHandler { +public class ClusterOverviewHandler extends AbstractJsonRequestHandler { private static final String version = EnvironmentInformation.getVersion(); @@ -49,7 +49,7 @@ public class ClusterOverviewHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { // we need no parameters, get all requests try { if (jobManager != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java index 11f2a3b..b690c56 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -38,7 +37,7 @@ import static java.util.Objects.requireNonNull; * May serve the IDs of current jobs, or past jobs, depending on whether this handler is * given the JobManager or Archive Actor Reference. */ -public class CurrentJobIdsHandler implements RequestHandler { +public class CurrentJobIdsHandler extends AbstractJsonRequestHandler { private final FiniteDuration timeout; @@ -47,7 +46,7 @@ public class CurrentJobIdsHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { // we need no parameters, get all requests try { if (jobManager != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java index 571f911..07064da 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java @@ -36,7 +36,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Request handler that returns a summary of the job status. */ -public class CurrentJobsOverviewHandler implements RequestHandler { +public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { private final FiniteDuration timeout; @@ -55,7 +55,7 @@ public class CurrentJobsOverviewHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { if (jobManager != null) { Future<Object> future = jobManager.ask( http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java index debb24c..6fe072b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java @@ -31,7 +31,7 @@ import java.util.TimeZone; * against this web server should behave. It defines for example the refresh interval, * and time zone of the server timestamps. */ -public class DashboardConfigHandler implements RequestHandler { +public class DashboardConfigHandler extends AbstractJsonRequestHandler { private final String configString; @@ -67,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { return this.configString; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java index 67673e2..ba32d0d 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java @@ -22,13 +22,13 @@ import org.apache.flink.runtime.instance.ActorGateway; import java.util.Map; -public class JarAccessDeniedHandler implements RequestHandler { +public class JarAccessDeniedHandler extends AbstractJsonRequestHandler { private static final String ERROR_MESSAGE = "{\"error\": \"Web submission interface is not " + "available for this cluster. To enable it, set the configuration key ' jobmanager.web.submit.enable.'\"}"; @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { return ERROR_MESSAGE; } } http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java index 9da54c1..1e23f1f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java @@ -47,7 +47,7 @@ import java.util.Map; /** * Abstract handler for fetching plan for a jar or running a jar. */ -public abstract class JarActionHandler implements RequestHandler { +public abstract class JarActionHandler extends AbstractJsonRequestHandler { private final File jarDir; http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java index 6e6c520..ae959a5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java @@ -29,7 +29,7 @@ import java.util.Map; /** * Handles requests for deletion of jars. */ -public class JarDeleteHandler implements RequestHandler { +public class JarDeleteHandler extends AbstractJsonRequestHandler { private final File jarDir; @@ -38,7 +38,7 @@ public class JarDeleteHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { final String file = pathParams.get("jarid"); try { File[] list = jarDir.listFiles(new FilenameFilter() { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index c263628..f3cdc30 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -31,7 +31,7 @@ import java.util.Map; import java.util.jar.JarFile; import java.util.jar.Manifest; -public class JarListHandler implements RequestHandler { +public class JarListHandler extends AbstractJsonRequestHandler { private final File jarDir; @@ -40,7 +40,7 @@ public class JarListHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index 7e0a810..3a95d6a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -37,7 +37,7 @@ public class JarPlanHandler extends JarActionHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { JobGraph graph = getJobGraphAndClassLoader(pathParams, queryParams).f0; StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 18b0f15..8d3e57f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -48,7 +48,7 @@ public class JarRunHandler extends JarActionHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { Tuple2<JobGraph, ClassLoader> graph = getJobGraphAndClassLoader(pathParams, queryParams); try { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java index 011e8f9..9a3b0e1 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java @@ -27,7 +27,7 @@ import java.util.UUID; /** * Handles requests for uploading of jars. */ -public class JarUploadHandler implements RequestHandler { +public class JarUploadHandler extends AbstractJsonRequestHandler { private final File jarDir; @@ -36,7 +36,7 @@ public class JarUploadHandler implements RequestHandler { } @Override - public String handleRequest( + public String handleJsonRequest( Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java index b17acdc..9f35719 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java @@ -28,10 +28,10 @@ import java.util.Map; /** * Request handler for the CANCEL request. */ -public class JobCancellationHandler implements RequestHandler { +public class JobCancellationHandler extends AbstractJsonRequestHandler { @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); if (jobManager != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java index 6d9f7e1..11ca931 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java @@ -28,7 +28,7 @@ import java.util.Map; /** * Returns the Job Manager's configuration. */ -public class JobManagerConfigHandler implements RequestHandler { +public class JobManagerConfigHandler extends AbstractJsonRequestHandler { private final Configuration config; @@ -37,7 +37,7 @@ public class JobManagerConfigHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java index 791790a..0f8c958 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java @@ -28,10 +28,10 @@ import java.util.Map; /** * Request handler for the STOP request. */ -public class JobStoppingHandler implements RequestHandler { +public class JobStoppingHandler extends AbstractJsonRequestHandler { @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { JobID jobid = new JobID(StringUtils.hexStringToByte(pathParams.get("jobid"))); if (jobManager != null) { http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java index 0927b7e..c56cfc3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java @@ -18,29 +18,35 @@ package org.apache.flink.runtime.webmonitor.handlers; +import io.netty.handler.codec.http.FullHttpResponse; import org.apache.flink.runtime.instance.ActorGateway; import java.util.Map; /** - * Base interface for all request handlers. The handlers must produce a JSOn response. + * Base interface for all request handlers. + * + * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler} + * as a starting point, which produces a valid HTTP response. */ public interface RequestHandler { /** * Core method that handles the request and generates the response. The method needs to - * respond with a valid JSON string. Exceptions may be throws and will be handled. + * respond with a full http response, including content-type, content-length, etc. + * + * <p>Exceptions may be throws and will be handled. * * @param pathParams The map of REST path parameters, decoded by the router. * @param queryParams The map of query parameters. * @param jobManager The JobManager actor. - * - * @return The JSON string that is the HTTP response. + * + * @return The full http response. * * @throws Exception Handlers may forward exceptions. Exceptions of type * {@link org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404 * response with the exception message, other exceptions will cause a HTTP 500 response * with the exception stack trace. */ - String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception; + FullHttpResponse handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java index b5e9088..c20d4fe 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java @@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers; import org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance; import org.apache.flink.util.StringUtils; - import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -38,7 +37,7 @@ import java.util.Map; import static java.util.Objects.requireNonNull; -public class TaskManagersHandler implements RequestHandler { +public class TaskManagersHandler extends AbstractJsonRequestHandler { public static final String TASK_MANAGER_ID_KEY = "taskmanagerid"; @@ -49,7 +48,7 @@ public class TaskManagersHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { try { if (jobManager != null) { // whether one task manager's metrics are requested, or all task manager, we http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java index 8374523..80126c6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java @@ -19,8 +19,8 @@ package org.apache.flink.runtime.webmonitor.metrics; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.instance.ActorGateway; +import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; -import org.apache.flink.runtime.webmonitor.handlers.RequestHandler; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -38,7 +38,7 @@ import java.util.Map; * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } */ -public abstract class AbstractMetricsHandler implements RequestHandler { +public abstract class AbstractMetricsHandler extends AbstractJsonRequestHandler { private final MetricFetcher fetcher; public AbstractMetricsHandler(MetricFetcher fetcher) { @@ -46,7 +46,7 @@ public abstract class AbstractMetricsHandler implements RequestHandler { } @Override - public String handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { + public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { fetcher.update(); String requestedMetricsList = queryParams.get("get"); return requestedMetricsList != null http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java index 483dbf6..13a9067 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java @@ -48,7 +48,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { pathParams.put("vertexid", "taskid"); // get list of available metrics - String availableList = handler.handleRequest(pathParams, queryParams, null); + String availableList = handler.handleJsonRequest(pathParams, queryParams, null); assertEquals("[" + "{\"id\":\"8.opname.abc.metric5\"}," + @@ -59,7 +59,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { // get value for a single metric queryParams.put("get", "8.opname.abc.metric5"); - String metricValue = handler.handleRequest(pathParams, queryParams, null); + String metricValue = handler.handleJsonRequest(pathParams, queryParams, null); assertEquals("[" + "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" + @@ -70,7 +70,7 @@ public class AbstractMetricsHandlerTest extends TestLogger { // get values for multiple metrics queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4"); - String metricValues = handler.handleRequest(pathParams, queryParams, null); + String metricValues = handler.handleJsonRequest(pathParams, queryParams, null); assertEquals("[" + "{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +
