[FLINK-4787] [runtime-web] Return generic HttpResponse in RequestHandler

- Let RequestHandler return a generic HttpResponse instead of a String. This
  enables handlers to return custom reponses (differnt http codes, etc.)
- Introduce AbstractJsonRequestHandler for default JSON responses


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2fb60091
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2fb60091
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2fb60091

Branch: refs/heads/master
Commit: 2fb600916860acf2256464659ca60424bbf26857
Parents: e9b20ec
Author: Ufuk Celebi <[email protected]>
Authored: Tue Oct 11 10:08:14 2016 +0200
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Oct 28 11:04:12 2016 +0200

----------------------------------------------------------------------
 .../webmonitor/RuntimeMonitorHandler.java       | 23 +++---
 .../AbstractExecutionGraphRequestHandler.java   |  4 +-
 .../handlers/AbstractJsonRequestHandler.java    | 73 ++++++++++++++++++++
 .../handlers/ClusterOverviewHandler.java        |  4 +-
 .../handlers/CurrentJobIdsHandler.java          |  5 +-
 .../handlers/CurrentJobsOverviewHandler.java    |  4 +-
 .../handlers/DashboardConfigHandler.java        |  4 +-
 .../handlers/JarAccessDeniedHandler.java        |  4 +-
 .../webmonitor/handlers/JarActionHandler.java   |  2 +-
 .../webmonitor/handlers/JarDeleteHandler.java   |  4 +-
 .../webmonitor/handlers/JarListHandler.java     |  4 +-
 .../webmonitor/handlers/JarPlanHandler.java     |  2 +-
 .../webmonitor/handlers/JarRunHandler.java      |  2 +-
 .../webmonitor/handlers/JarUploadHandler.java   |  4 +-
 .../handlers/JobCancellationHandler.java        |  4 +-
 .../handlers/JobManagerConfigHandler.java       |  4 +-
 .../webmonitor/handlers/JobStoppingHandler.java |  4 +-
 .../webmonitor/handlers/RequestHandler.java     | 16 +++--
 .../handlers/TaskManagersHandler.java           |  5 +-
 .../metrics/AbstractMetricsHandler.java         |  6 +-
 .../metrics/AbstractMetricsHandlerTest.java     |  6 +-
 21 files changed, 130 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 5008a8c..aba4e17 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -23,6 +23,7 @@ import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
 import io.netty.handler.codec.http.HttpHeaders;
 import io.netty.handler.codec.http.HttpResponseStatus;
 import io.netty.handler.codec.http.HttpVersion;
@@ -61,7 +62,7 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
 
        public static final String WEB_MONITOR_ADDRESS_KEY = 
"web.monitor.address";
        
-       private final RequestHandler handler;   
+       private final RequestHandler handler;
 
        public RuntimeMonitorHandler(
                        RequestHandler handler,
@@ -75,7 +76,7 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
 
        @Override
        protected void respondAsLeader(ChannelHandlerContext ctx, Routed 
routed, ActorGateway jobManager) {
-               DefaultFullHttpResponse response;
+               FullHttpResponse response;
 
                try {
                        // we only pass the first element in the list to the 
handlers.
@@ -93,14 +94,7 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                        queryParams.put(WEB_MONITOR_ADDRESS_KEY,
                                (httpsEnabled ? "https://"; : "http://";) + 
address.getHostName() + ":" + address.getPort());
 
-                       String result = handler.handleRequest(pathParams, 
queryParams, jobManager);
-                       byte[] bytes = result.getBytes(ENCODING);
-
-                       response = new DefaultFullHttpResponse(
-                                       HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
-                       
response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
-                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json");
+                       response = handler.handleRequest(pathParams, 
queryParams, jobManager);
                }
                catch (NotFoundException e) {
                        // this should result in a 404 error code (not found)
@@ -108,6 +102,8 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                                        : 
Unpooled.wrappedBuffer(e.getMessage().getBytes(ENCODING));
                        response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.NOT_FOUND, 
message);
                        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
                        LOG.debug("Error while handling request", e);
                }
                catch (Exception e) {
@@ -115,11 +111,14 @@ public class RuntimeMonitorHandler extends 
RuntimeMonitorHandlerBase {
                        response = new 
DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                                        
HttpResponseStatus.INTERNAL_SERVER_ERROR, Unpooled.wrappedBuffer(bytes));
                        response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, "utf-8");
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
                        LOG.debug("Error while handling request", e);
                }
 
-               response.headers().set(HttpHeaders.Names.CONTENT_ENCODING, 
"utf-8");
-               response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+               
response.headers().set(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, "*");
+
                KeepAliveWrite.flush(ctx, routed.request(), response);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
index ff28d4e..8cd70e9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractExecutionGraphRequestHandler.java
@@ -30,7 +30,7 @@ import java.util.Map;
  * Base class for request handlers whose response depends on an ExecutionGraph
  * that can be retrieved via "jobid" parameter.
  */
-public abstract class AbstractExecutionGraphRequestHandler implements 
RequestHandler {
+public abstract class AbstractExecutionGraphRequestHandler extends 
AbstractJsonRequestHandler {
        
        private final ExecutionGraphHolder executionGraphHolder;
        
@@ -39,7 +39,7 @@ public abstract class AbstractExecutionGraphRequestHandler 
implements RequestHan
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                String jidString = pathParams.get("jobid");
                if (jidString == null) {
                        throw new RuntimeException("JobId parameter missing");

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
new file mode 100644
index 0000000..ae163cb
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import org.apache.flink.runtime.instance.ActorGateway;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+
+/**
+ * Base class for most request handlers. The handlers must produce a JSON 
response.
+ */
+public abstract class AbstractJsonRequestHandler implements RequestHandler {
+
+       private static final Charset ENCODING = Charset.forName("UTF-8");
+
+       @Override
+       public FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
+               String result = handleJsonRequest(pathParams, queryParams, 
jobManager);
+               byte[] bytes = result.getBytes(ENCODING);
+
+               DefaultFullHttpResponse response = new DefaultFullHttpResponse(
+                               HttpVersion.HTTP_1_1, HttpResponseStatus.OK, 
Unpooled.wrappedBuffer(bytes));
+
+               response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json");
+               response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+               return response;
+       }
+
+       /**
+        * Core method that handles the request and generates the response. The 
method needs to
+        * respond with a valid JSON string. Exceptions may be thrown and will 
be handled.
+        *
+        * @param pathParams The map of REST path parameters, decoded by the 
router.
+        * @param queryParams The map of query parameters.
+        * @param jobManager The JobManager actor.
+        *
+        * @return The JSON string that is the HTTP response.
+        *
+        * @throws Exception Handlers may forward exceptions. Exceptions of type
+        *         {@link 
org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
+        *         response with the exception message, other exceptions will 
cause a HTTP 500 response
+        *         with the exception stack trace.
+        */
+       public abstract String handleJsonRequest(
+                       Map<String, String> pathParams,
+                       Map<String, String> queryParams,
+                       ActorGateway jobManager) throws Exception;
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
index b7389c4..99ef3d9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
@@ -36,7 +36,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * Responder that returns the status of the Flink cluster, such as how many
  * TaskManagers are currently connected, and how many jobs are running.
  */
-public class ClusterOverviewHandler implements RequestHandler {
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
 
        private static final String version = 
EnvironmentInformation.getVersion();
 
@@ -49,7 +49,7 @@ public class ClusterOverviewHandler implements RequestHandler 
{
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                // we need no parameters, get all requests
                try {
                        if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
index 11f2a3b..b690c56 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobsWithIDsOverview;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import static java.util.Objects.requireNonNull;
  * May serve the IDs of current jobs, or past jobs, depending on whether this 
handler is
  * given the JobManager or Archive Actor Reference.
  */
-public class CurrentJobIdsHandler implements RequestHandler {
+public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
 
        private final FiniteDuration timeout;
        
@@ -47,7 +46,7 @@ public class CurrentJobIdsHandler implements RequestHandler {
        }
        
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                // we need no parameters, get all requests
                try {
                        if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 571f911..07064da 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -36,7 +36,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Request handler that returns a summary of the job status.
  */
-public class CurrentJobsOverviewHandler implements RequestHandler {
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
 
        private final FiniteDuration timeout;
        
@@ -55,7 +55,7 @@ public class CurrentJobsOverviewHandler implements 
RequestHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        if (jobManager != null) {
                                Future<Object> future = jobManager.ask(

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index debb24c..6fe072b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -31,7 +31,7 @@ import java.util.TimeZone;
  * against this web server should behave. It defines for example the refresh 
interval,
  * and time zone of the server timestamps.
  */
-public class DashboardConfigHandler implements RequestHandler {
+public class DashboardConfigHandler extends AbstractJsonRequestHandler {
        
        private final String configString;
        
@@ -67,7 +67,7 @@ public class DashboardConfigHandler implements RequestHandler 
{
        }
        
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                return this.configString;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 67673e2..ba32d0d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -22,13 +22,13 @@ import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.util.Map;
 
-public class JarAccessDeniedHandler implements RequestHandler {
+public class JarAccessDeniedHandler extends AbstractJsonRequestHandler {
 
        private static final String ERROR_MESSAGE = "{\"error\": \"Web 
submission interface is not " +
                        "available for this cluster. To enable it, set the 
configuration key ' jobmanager.web.submit.enable.'\"}";
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                return ERROR_MESSAGE;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 9da54c1..1e23f1f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -47,7 +47,7 @@ import java.util.Map;
 /**
  * Abstract handler for fetching plan for a jar or running a jar.
  */
-public abstract class JarActionHandler implements RequestHandler {
+public abstract class JarActionHandler extends AbstractJsonRequestHandler {
        
        private final File jarDir;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index 6e6c520..ae959a5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -29,7 +29,7 @@ import java.util.Map;
 /**
  * Handles requests for deletion of jars.
  */
-public class JarDeleteHandler implements RequestHandler {
+public class JarDeleteHandler extends AbstractJsonRequestHandler {
 
        private final File jarDir;
 
@@ -38,7 +38,7 @@ public class JarDeleteHandler implements RequestHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                final String file = pathParams.get("jarid");
                try {
                        File[] list = jarDir.listFiles(new FilenameFilter() {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index c263628..f3cdc30 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -31,7 +31,7 @@ import java.util.Map;
 import java.util.jar.JarFile;
 import java.util.jar.Manifest;
 
-public class JarListHandler implements RequestHandler {
+public class JarListHandler extends AbstractJsonRequestHandler {
 
        private final File jarDir;
 
@@ -40,7 +40,7 @@ public class JarListHandler implements RequestHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        StringWriter writer = new StringWriter();
                        JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index 7e0a810..3a95d6a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -37,7 +37,7 @@ public class JarPlanHandler extends JarActionHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        JobGraph graph = getJobGraphAndClassLoader(pathParams, 
queryParams).f0;
                        StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 18b0f15..8d3e57f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -48,7 +48,7 @@ public class JarRunHandler extends JarActionHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        Tuple2<JobGraph, ClassLoader> graph = 
getJobGraphAndClassLoader(pathParams, queryParams);
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 011e8f9..9a3b0e1 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -27,7 +27,7 @@ import java.util.UUID;
 /**
  * Handles requests for uploading of jars.
  */
-public class JarUploadHandler implements RequestHandler {
+public class JarUploadHandler extends AbstractJsonRequestHandler {
 
        private final File jarDir;
 
@@ -36,7 +36,7 @@ public class JarUploadHandler implements RequestHandler {
        }
 
        @Override
-       public String handleRequest(
+       public String handleJsonRequest(
                                Map<String, String> pathParams,
                                Map<String, String> queryParams,
                                ActorGateway jobManager) throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
index b17acdc..9f35719 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Request handler for the CANCEL request.
  */
-public class JobCancellationHandler implements RequestHandler {
+public class JobCancellationHandler extends AbstractJsonRequestHandler {
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        JobID jobid = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
                        if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
index 6d9f7e1..11ca931 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
@@ -28,7 +28,7 @@ import java.util.Map;
 /**
  * Returns the Job Manager's configuration.
  */
-public class JobManagerConfigHandler implements RequestHandler {
+public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
 
        private final Configuration config;
 
@@ -37,7 +37,7 @@ public class JobManagerConfigHandler implements 
RequestHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
index 791790a..0f8c958 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
@@ -28,10 +28,10 @@ import java.util.Map;
 /**
  * Request handler for the STOP request.
  */
-public class JobStoppingHandler implements RequestHandler {
+public class JobStoppingHandler extends AbstractJsonRequestHandler {
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        JobID jobid = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
                        if (jobManager != null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
index 0927b7e..c56cfc3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
@@ -18,29 +18,35 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import io.netty.handler.codec.http.FullHttpResponse;
 import org.apache.flink.runtime.instance.ActorGateway;
 
 import java.util.Map;
 
 /**
- * Base interface for all request handlers. The handlers must produce a JSOn 
response.
+ * Base interface for all request handlers.
+ *
+ * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler}
+ * as a starting point, which produces a valid HTTP response.
  */
 public interface RequestHandler {
 
        /**
         * Core method that handles the request and generates the response. The 
method needs to
-        * respond with a valid JSON string. Exceptions may be throws and will 
be handled.
+        * respond with a full http response, including content-type, 
content-length, etc.
+        *
+        * <p>Exceptions may be throws and will be handled.
         * 
         * @param pathParams The map of REST path parameters, decoded by the 
router.
         * @param queryParams The map of query parameters.
         * @param jobManager The JobManager actor.
-        * 
-        * @return The JSON string that is the HTTP response.
+        *
+        * @return The full http response.
         * 
         * @throws Exception Handlers may forward exceptions. Exceptions of type
         *         {@link 
org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
         *         response with the exception message, other exceptions will 
cause a HTTP 500 response
         *         with the exception stack trace.
         */
-       String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception;
+       FullHttpResponse handleRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
index b5e9088..c20d4fe 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.RegisteredTaskManagers;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.TaskManagerInstance;
 import org.apache.flink.util.StringUtils;
-
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
@@ -38,7 +37,7 @@ import java.util.Map;
 
 import static java.util.Objects.requireNonNull;
 
-public class TaskManagersHandler implements RequestHandler {
+public class TaskManagersHandler extends AbstractJsonRequestHandler  {
 
        public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
        
@@ -49,7 +48,7 @@ public class TaskManagersHandler implements RequestHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                try {
                        if (jobManager != null) {
                                // whether one task manager's metrics are 
requested, or all task manager, we

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
index 8374523..80126c6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
@@ -19,8 +19,8 @@ package org.apache.flink.runtime.webmonitor.metrics;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.handlers.RequestHandler;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -38,7 +38,7 @@ import java.util.Map;
  * The handler will then return a list containing the values of the requested 
metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
  */
-public abstract class AbstractMetricsHandler implements RequestHandler {
+public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler {
        private final MetricFetcher fetcher;
 
        public AbstractMetricsHandler(MetricFetcher fetcher) {
@@ -46,7 +46,7 @@ public abstract class AbstractMetricsHandler implements 
RequestHandler {
        }
 
        @Override
-       public String handleRequest(Map<String, String> pathParams, Map<String, 
String> queryParams, ActorGateway jobManager) throws Exception {
+       public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                fetcher.update();
                String requestedMetricsList = queryParams.get("get");
                return requestedMetricsList != null

http://git-wip-us.apache.org/repos/asf/flink/blob/2fb60091/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
index 483dbf6..13a9067 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandlerTest.java
@@ -48,7 +48,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                pathParams.put("vertexid", "taskid");
 
                // get list of available metrics
-               String availableList = handler.handleRequest(pathParams, 
queryParams, null);
+               String availableList = handler.handleJsonRequest(pathParams, 
queryParams, null);
 
                assertEquals("[" +
                                "{\"id\":\"8.opname.abc.metric5\"}," +
@@ -59,7 +59,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                // get value for a single metric
                queryParams.put("get", "8.opname.abc.metric5");
 
-               String metricValue = handler.handleRequest(pathParams, 
queryParams, null);
+               String metricValue = handler.handleJsonRequest(pathParams, 
queryParams, null);
 
                assertEquals("[" +
                                
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}" +
@@ -70,7 +70,7 @@ public class AbstractMetricsHandlerTest extends TestLogger {
                // get values for multiple metrics
                queryParams.put("get", "8.opname.abc.metric5,8.abc.metric4");
 
-               String metricValues = handler.handleRequest(pathParams, 
queryParams, null);
+               String metricValues = handler.handleJsonRequest(pathParams, 
queryParams, null);
 
                assertEquals("[" +
                                
"{\"id\":\"8.opname.abc.metric5\",\"value\":\"4\"}," +

Reply via email to