http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
deleted file mode 100644
index 1ec3f9c..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractJsonRequestHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import java.nio.charset.Charset;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for most request handlers. The handlers must produce a JSON 
response.
- */
-public abstract class AbstractJsonRequestHandler implements RequestHandler {
-
-       private static final Charset ENCODING = Charset.forName("UTF-8");
-
-       protected final Executor executor;
-
-       protected AbstractJsonRequestHandler(Executor executor) {
-               this.executor = Preconditions.checkNotNull(executor);
-       }
-
-       @Override
-       public CompletableFuture<FullHttpResponse> handleRequest(Map<String, 
String> pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               CompletableFuture<String> resultFuture = 
handleJsonRequest(pathParams, queryParams, jobManagerGateway);
-
-               return resultFuture.thenApplyAsync(
-                       (String result) -> {
-                               byte[] bytes = result.getBytes(ENCODING);
-
-                               DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
-                                       HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
-
-                               
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; 
charset=" + ENCODING.name());
-                               
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
-
-                               return response;
-                       });
-       }
-
-       /**
-        * Core method that handles the request and generates the response. The 
method needs to
-        * respond with a valid JSON string. Exceptions may be thrown and will 
be handled.
-        *
-        * @param pathParams The map of REST path parameters, decoded by the 
router.
-        * @param queryParams The map of query parameters.
-        * @param jobManagerGateway to communicate with the JobManager.
-        *
-        * @return The JSON string that is the HTTP response.
-        *
-        * @throws Exception Handlers may forward exceptions. Exceptions of type
-        *         {@link 
org.apache.flink.runtime.webmonitor.NotFoundException} will cause a HTTP 404
-        *         response with the exception message, other exceptions will 
cause a HTTP 500 response
-        *         with the exception stack trace.
-        */
-       public abstract CompletableFuture<String> handleJsonRequest(
-                       Map<String, String> pathParams,
-                       Map<String, String> queryParams,
-                       JobManagerGateway jobManagerGateway);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
deleted file mode 100644
index 1b20673..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskAttemptRequestHandler.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.FlinkException;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific 
subtask execution attempt
- * (defined via the "attempt" parameter) of a specific subtask (defined via the
- * "subtasknum" parameter) in a specific job vertex (defined via the 
"vertexid" parameter) in a
- * specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractSubtaskAttemptRequestHandler extends 
AbstractSubtaskRequestHandler {
-
-       public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionVertex 
vertex, Map<String, String> params) {
-               final String attemptNumberString = params.get("attempt");
-               if (attemptNumberString == null) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Attempt number parameter missing"));
-               }
-
-               final int attempt;
-               try {
-                       attempt = Integer.parseInt(attemptNumberString);
-               }
-               catch (NumberFormatException e) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Invalid attempt number parameter"));
-               }
-
-               final AccessExecution currentAttempt = 
vertex.getCurrentExecutionAttempt();
-               if (attempt == currentAttempt.getAttemptNumber()) {
-                       return handleRequest(currentAttempt, params);
-               }
-               else if (attempt >= 0 && attempt < 
currentAttempt.getAttemptNumber()) {
-                       AccessExecution exec = 
vertex.getPriorExecutionAttempt(attempt);
-
-                       if (exec != null) {
-                               return handleRequest(exec, params);
-                       } else {
-                               return FutureUtils.completedExceptionally(new 
RequestHandlerException("Execution for attempt " + attempt +
-                                       " has already been deleted."));
-                       }
-               }
-               else {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Attempt does not exist: " + attempt));
-               }
-       }
-
-       public abstract CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
deleted file mode 100644
index ab85034..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/AbstractSubtaskRequestHandler.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.util.FlinkException;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific 
subtask (defined via the
- * "subtasknum" parameter) in a specific job vertex (defined via the 
"vertexid" parameter) in a
- * specific job, defined via (defined voa the "jobid" parameter).
- */
-public abstract class AbstractSubtaskRequestHandler extends 
AbstractJobVertexRequestHandler {
-
-       public AbstractSubtaskRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public final CompletableFuture<String> 
handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
-               final String subtaskNumberString = params.get("subtasknum");
-               if (subtaskNumberString == null) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Subtask number parameter missing"));
-               }
-
-               final int subtask;
-               try {
-                       subtask = Integer.parseInt(subtaskNumberString);
-               }
-               catch (NumberFormatException e) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Invalid subtask number parameter", e));
-               }
-
-               if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("subtask does not exist: " + subtask));
-               }
-
-               final AccessExecutionVertex vertex = 
jobVertex.getTaskVertices()[subtask];
-               return handleRequest(vertex, params);
-       }
-
-       public abstract CompletableFuture<String> 
handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
deleted file mode 100644
index 17db2e8..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ClusterOverviewHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-import org.apache.flink.util.FlinkException;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Responder that returns the status of the Flink cluster, such as how many
- * TaskManagers are currently connected, and how many jobs are running.
- */
-public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
-
-       private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
-
-       private static final String version = 
EnvironmentInformation.getVersion();
-
-       private static final String commitID = 
EnvironmentInformation.getRevisionInformation().commitId;
-
-       private final Time timeout;
-
-       public ClusterOverviewHandler(Executor executor, Time timeout) {
-               super(executor);
-               this.timeout = checkNotNull(timeout);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{CLUSTER_OVERVIEW_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               // we need no parameters, get all requests
-               try {
-                       if (jobManagerGateway != null) {
-                               CompletableFuture<StatusOverview> 
overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
-
-                               return overviewFuture.thenApplyAsync(
-                                       (StatusOverview overview) -> {
-                                               StringWriter writer = new 
StringWriter();
-                                               try {
-                                                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                                                       gen.writeStartObject();
-                                                       
gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
-                                                       
gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
-                                                       
gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
-                                                       
gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
-                                                       
gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
-                                                       
gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
-                                                       
gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
-                                                       
gen.writeStringField("flink-version", version);
-                                                       if 
(!commitID.equals(EnvironmentInformation.UNKNOWN)) {
-                                                               
gen.writeStringField("flink-commit", commitID);
-                                                       }
-                                                       gen.writeEndObject();
-
-                                                       gen.close();
-                                                       return 
writer.toString();
-                                               } catch (IOException exception) 
{
-                                                       throw new 
FlinkFutureException("Could not write cluster overview.", exception);
-                                               }
-                                       },
-                                       executor);
-                       } else {
-                               throw new Exception("No connection to the 
leading JobManager.");
-                       }
-               }
-               catch (Exception e) {
-                       return FutureUtils.completedExceptionally(new 
FlinkException("Failed to fetch list of all running jobs: ", e));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
deleted file mode 100644
index 34898e7..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ConstantTextHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.configuration.ConfigConstants;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-/**
- * Responder that returns a constant String.
- */
[email protected]
-public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
-
-       private final byte[] encodedText;
-
-       public ConstantTextHandler(String text) {
-               this.encodedText = 
text.getBytes(ConfigConstants.DEFAULT_CHARSET);
-       }
-
-       @Override
-       protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
-               HttpResponse response = new DefaultFullHttpResponse(
-                       HttpVersion.HTTP_1_1, HttpResponseStatus.OK, 
Unpooled.wrappedBuffer(encodedText));
-
-               response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
encodedText.length);
-               response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
-
-               KeepAliveWrite.flush(ctx, routed.request(), response);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
deleted file mode 100644
index acf1cd0..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobIdsHandler.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * Responder that returns with a list of all JobIDs of jobs found at the 
target actor.
- * May serve the IDs of current jobs, or past jobs, depending on whether this 
handler is
- * given the JobManager or Archive Actor Reference.
- */
-public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
-
-       private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
-
-       private final Time timeout;
-
-       public CurrentJobIdsHandler(Executor executor, Time timeout) {
-               super(executor);
-               this.timeout = requireNonNull(timeout);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{CURRENT_JOB_IDS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               // we need no parameters, get all requests
-                               try {
-                                       if (jobManagerGateway != null) {
-                                               
CompletableFuture<JobsWithIDsOverview> overviewFuture = 
jobManagerGateway.requestJobsOverview(timeout);
-                                               JobsWithIDsOverview overview = 
overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
-
-                                               StringWriter writer = new 
StringWriter();
-                                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                                               gen.writeStartObject();
-
-                                               
gen.writeArrayFieldStart("jobs-running");
-                                               for (JobID jid : 
overview.getJobsRunningOrPending()) {
-                                                       
gen.writeString(jid.toString());
-                                               }
-                                               gen.writeEndArray();
-
-                                               
gen.writeArrayFieldStart("jobs-finished");
-                                               for (JobID jid : 
overview.getJobsFinished()) {
-                                                       
gen.writeString(jid.toString());
-                                               }
-                                               gen.writeEndArray();
-
-                                               
gen.writeArrayFieldStart("jobs-cancelled");
-                                               for (JobID jid : 
overview.getJobsCancelled()) {
-                                                       
gen.writeString(jid.toString());
-                                               }
-                                               gen.writeEndArray();
-
-                                               
gen.writeArrayFieldStart("jobs-failed");
-                                               for (JobID jid : 
overview.getJobsFailed()) {
-                                                       
gen.writeString(jid.toString());
-                                               }
-                                               gen.writeEndArray();
-
-                                               gen.writeEndObject();
-
-                                               gen.close();
-                                               return writer.toString();
-                                       }
-                                       else {
-                                               throw new Exception("No 
connection to the leading JobManager.");
-                                       }
-                               }
-                               catch (Exception e) {
-                                       throw new FlinkFutureException("Failed 
to fetch list of all running jobs.", e);
-                               }
-                       },
-                       executor);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
deleted file mode 100644
index a5b116c..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns a summary of the job status.
- */
-public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
-
-       private static final String ALL_JOBS_REST_PATH = "/joboverview";
-       private static final String RUNNING_JOBS_REST_PATH = 
"/joboverview/running";
-       private static final String COMPLETED_JOBS_REST_PATH = 
"/joboverview/completed";
-
-       private final Time timeout;
-
-       private final boolean includeRunningJobs;
-       private final boolean includeFinishedJobs;
-
-       public CurrentJobsOverviewHandler(
-                       Executor executor,
-                       Time timeout,
-                       boolean includeRunningJobs,
-                       boolean includeFinishedJobs) {
-
-               super(executor);
-               this.timeout = checkNotNull(timeout);
-               this.includeRunningJobs = includeRunningJobs;
-               this.includeFinishedJobs = includeFinishedJobs;
-       }
-
-       @Override
-       public String[] getPaths() {
-               if (includeRunningJobs && includeFinishedJobs) {
-                       return new String[]{ALL_JOBS_REST_PATH};
-               }
-               if (includeRunningJobs) {
-                       return new String[]{RUNNING_JOBS_REST_PATH};
-               } else {
-                       return new String[]{COMPLETED_JOBS_REST_PATH};
-               }
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               if (jobManagerGateway != null) {
-                       CompletableFuture<MultipleJobsDetails> jobDetailsFuture 
= jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, 
timeout);
-
-                       return jobDetailsFuture.thenApplyAsync(
-                               (MultipleJobsDetails result) -> {
-                                       final long now = 
System.currentTimeMillis();
-
-                                       StringWriter writer = new 
StringWriter();
-                                       try {
-                                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-                                               gen.writeStartObject();
-
-                                               if (includeRunningJobs && 
includeFinishedJobs) {
-                                                       
gen.writeArrayFieldStart("running");
-                                                       for (JobDetails detail 
: result.getRunningJobs()) {
-                                                               
writeJobDetailOverviewAsJson(detail, gen, now);
-                                                       }
-                                                       gen.writeEndArray();
-
-                                                       
gen.writeArrayFieldStart("finished");
-                                                       for (JobDetails detail 
: result.getFinishedJobs()) {
-                                                               
writeJobDetailOverviewAsJson(detail, gen, now);
-                                                       }
-                                                       gen.writeEndArray();
-                                               } else {
-                                                       
gen.writeArrayFieldStart("jobs");
-                                                       for (JobDetails detail 
: includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-                                                               
writeJobDetailOverviewAsJson(detail, gen, now);
-                                                       }
-                                                       gen.writeEndArray();
-                                               }
-
-                                               gen.writeEndObject();
-                                               gen.close();
-                                               return writer.toString();
-                                       } catch (IOException e) {
-                                               throw new 
FlinkFutureException("Could not write current jobs overview json.", e);
-                                       }
-                               },
-                               executor);
-               }
-               else {
-                       return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
-               }
-       }
-
-       /**
-        * Archivist for the CurrentJobsOverviewHandler.
-        */
-       public static class CurrentJobsOverviewJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       StringWriter writer = new StringWriter();
-                       try (JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
-                               gen.writeStartObject();
-                               gen.writeArrayFieldStart("running");
-                               gen.writeEndArray();
-                               gen.writeArrayFieldStart("finished");
-                               
writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, 
System.currentTimeMillis());
-                               gen.writeEndArray();
-                               gen.writeEndObject();
-                       }
-                       String json = writer.toString();
-                       String path = ALL_JOBS_REST_PATH;
-                       return Collections.singleton(new ArchivedJson(path, 
json));
-               }
-       }
-
-       public static void writeJobDetailOverviewAsJson(JobDetails details, 
JsonGenerator gen, long now) throws IOException {
-               gen.writeStartObject();
-
-               gen.writeStringField("jid", details.getJobId().toString());
-               gen.writeStringField("name", details.getJobName());
-               gen.writeStringField("state", details.getStatus().name());
-
-               gen.writeNumberField("start-time", details.getStartTime());
-               gen.writeNumberField("end-time", details.getEndTime());
-               gen.writeNumberField("duration", (details.getEndTime() <= 0 ? 
now : details.getEndTime()) - details.getStartTime());
-               gen.writeNumberField("last-modification", 
details.getLastUpdateTime());
-
-               gen.writeObjectFieldStart("tasks");
-               gen.writeNumberField("total", details.getNumTasks());
-
-               final int[] perState = 
details.getNumVerticesPerExecutionState();
-               gen.writeNumberField("pending", 
perState[ExecutionState.CREATED.ordinal()] +
-                               perState[ExecutionState.SCHEDULED.ordinal()] +
-                               perState[ExecutionState.DEPLOYING.ordinal()]);
-               gen.writeNumberField("running", 
perState[ExecutionState.RUNNING.ordinal()]);
-               gen.writeNumberField("finished", 
perState[ExecutionState.FINISHED.ordinal()]);
-               gen.writeNumberField("canceling", 
perState[ExecutionState.CANCELING.ordinal()]);
-               gen.writeNumberField("canceled", 
perState[ExecutionState.CANCELED.ordinal()]);
-               gen.writeNumberField("failed", 
perState[ExecutionState.FAILED.ordinal()]);
-               gen.writeEndObject();
-
-               gen.writeEndObject();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
deleted file mode 100644
index 39984b1..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.TimeZone;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Responder that returns the parameters that define how the asynchronous 
requests
- * against this web server should behave. It defines for example the refresh 
interval,
- * and time zone of the server timestamps.
- */
-public class DashboardConfigHandler extends AbstractJsonRequestHandler {
-
-       private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
-
-       private final String configString;
-
-       public DashboardConfigHandler(Executor executor, long refreshInterval) {
-               super(executor);
-               try {
-                       this.configString = createConfigJson(refreshInterval);
-               }
-               catch (Exception e) {
-                       // should never happen
-                       throw new RuntimeException(e.getMessage(), e);
-               }
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{DASHBOARD_CONFIG_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               return CompletableFuture.completedFuture(configString);
-       }
-
-       public static String createConfigJson(long refreshInterval) throws 
IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               TimeZone timeZone = TimeZone.getDefault();
-               String timeZoneName = timeZone.getDisplayName();
-               long timeZoneOffset = timeZone.getRawOffset();
-
-               gen.writeStartObject();
-               gen.writeNumberField("refresh-interval", refreshInterval);
-               gen.writeNumberField("timezone-offset", timeZoneOffset);
-               gen.writeStringField("timezone-name", timeZoneName);
-               gen.writeStringField("flink-version", 
EnvironmentInformation.getVersion());
-
-               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
-               if (revision != null) {
-                       gen.writeStringField("flink-revision", 
revision.commitId + " @ " + revision.commitDate);
-               }
-
-               gen.writeEndObject();
-
-               gen.close();
-
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
index 978432b..c95cc32 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarAccessDeniedHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
index 0b0d32e..760c836 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarActionHandler.java
@@ -35,6 +35,8 @@ import org.apache.flink.optimizer.plan.StreamingPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.util.ExceptionUtils;
 
 import com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
index d9df1d4..04f663d 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarDeleteHandler.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
index 95281a4..4248dd4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.runtime.webmonitor.RuntimeMonitorHandler;
 
 import com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
index b117b3d..4d79492 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
index 7ada0b4..16a1565 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.concurrent.FlinkFutureException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
 import org.apache.flink.util.Preconditions;
 
 import com.fasterxml.jackson.core.JsonGenerator;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
index 61b3f58..9a0bac4 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarUploadHandler.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.AbstractJsonRequestHandler;
 
 import java.io.File;
 import java.util.Map;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
deleted file mode 100644
index 4dede3a..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the aggregated user accumulators of a job.
- */
-public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler {
-
-       private static final String JOB_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/accumulators";
-
-       public JobAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_ACCUMULATORS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return createJobAccumulatorsJson(graph);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create job accumulators json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the JobAccumulatorsHandler.
-        */
-       public static class JobAccumulatorsJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String json = createJobAccumulatorsJson(graph);
-                       String path = JOB_ACCUMULATORS_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       return Collections.singletonList(new ArchivedJson(path, 
json));
-               }
-       }
-
-       public static String createJobAccumulatorsJson(AccessExecutionGraph 
graph) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
-
-               gen.writeStartObject();
-
-               gen.writeArrayFieldStart("job-accumulators");
-               // empty for now
-               gen.writeEndArray();
-
-               gen.writeArrayFieldStart("user-task-accumulators");
-               for (StringifiedAccumulatorResult acc : allAccumulators) {
-                       gen.writeStartObject();
-                       gen.writeStringField("name", acc.getName());
-                       gen.writeStringField("type", acc.getType());
-                       gen.writeStringField("value", acc.getValue());
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
deleted file mode 100644
index 1a7d868..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler for the CANCEL request.
- */
-public class JobCancellationHandler extends AbstractJsonRequestHandler {
-
-       private static final String JOB_CONCELLATION_REST_PATH = 
"/jobs/:jobid/cancel";
-       private static final String JOB_CONCELLATION_YARN_REST_PATH = 
"/jobs/:jobid/yarn-cancel";
-
-       private final Time timeout;
-
-       public JobCancellationHandler(Executor executor, Time timeout) {
-               super(executor);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_CONCELLATION_REST_PATH, 
JOB_CONCELLATION_YARN_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-                                       if (jobManagerGateway != null) {
-                                               
jobManagerGateway.cancelJob(jobId, timeout);
-                                               return "{}";
-                                       }
-                                       else {
-                                               throw new Exception("No 
connection to the leading JobManager.");
-                                       }
-                               }
-                               catch (Exception e) {
-                                       throw new FlinkFutureException("Failed 
to cancel the job with id: "  + pathParams.get("jobid"), e);
-                               }
-                       },
-                       executor);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
deleted file mode 100644
index 4e41447..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobCancellationWithSavepointHandlers.java
+++ /dev/null
@@ -1,428 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.CoreOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.NotFoundException;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.nio.charset.Charset;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler for {@link CancelJobWithSavepoint} messages.
- */
-public class JobCancellationWithSavepointHandlers {
-
-       private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint";
-       private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
-
-       /** URL for in-progress cancellations. */
-       private static final String CANCELLATION_IN_PROGRESS_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
-
-       /** Encodings for String. */
-       private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
-
-       /** Shared lock between Trigger and In-Progress handlers. */
-       private final Object lock = new Object();
-
-       /** In-Progress requests. */
-       private final Map<JobID, Long> inProgress = new HashMap<>();
-
-       /** Succeeded/failed request. Either String or Throwable. */
-       private final Map<Long, Object> completed = new HashMap<>();
-
-       /** Atomic request counter. */
-       private long requestCounter;
-
-       /** Handler for trigger requests. */
-       private final TriggerHandler triggerHandler;
-
-       /** Handler for in-progress requests. */
-       private final InProgressHandler inProgressHandler;
-
-       /** Default savepoint directory. */
-       private final String defaultSavepointDirectory;
-
-       public JobCancellationWithSavepointHandlers(
-                       ExecutionGraphHolder currentGraphs,
-                       Executor executor) {
-               this(currentGraphs, executor, null);
-       }
-
-       public JobCancellationWithSavepointHandlers(
-                       ExecutionGraphHolder currentGraphs,
-                       Executor executor,
-                       @Nullable String defaultSavepointDirectory) {
-
-               this.triggerHandler = new TriggerHandler(currentGraphs, 
executor);
-               this.inProgressHandler = new InProgressHandler();
-               this.defaultSavepointDirectory = defaultSavepointDirectory;
-       }
-
-       public TriggerHandler getTriggerHandler() {
-               return triggerHandler;
-       }
-
-       public InProgressHandler getInProgressHandler() {
-               return inProgressHandler;
-       }
-
-       // 
------------------------------------------------------------------------
-       // New requests
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Handler for triggering a {@link CancelJobWithSavepoint} message.
-        */
-       class TriggerHandler implements RequestHandler {
-
-               /** Current execution graphs. */
-               private final ExecutionGraphHolder currentGraphs;
-
-               /** Execution context for futures. */
-               private final Executor executor;
-
-               public TriggerHandler(ExecutionGraphHolder currentGraphs, 
Executor executor) {
-                       this.currentGraphs = checkNotNull(currentGraphs);
-                       this.executor = checkNotNull(executor);
-               }
-
-               @Override
-               public String[] getPaths() {
-                       return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, 
CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
-               }
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public CompletableFuture<FullHttpResponse> handleRequest(
-                               Map<String, String> pathParams,
-                               Map<String, String> queryParams,
-                               JobManagerGateway jobManagerGateway) {
-
-                       if (jobManagerGateway != null) {
-                               JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
-                               final 
CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
-
-                               graphFuture = 
currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
-
-                               return graphFuture.thenApplyAsync(
-                                       (Optional<AccessExecutionGraph> 
optGraph) -> {
-                                               final AccessExecutionGraph 
graph = optGraph.orElseThrow(
-                                                       () -> new 
FlinkFutureException(
-                                                               new 
NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
-
-                                               CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
-                                               if (coord == null) {
-                                                       throw new 
FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for 
job."));
-                                               }
-
-                                               String targetDirectory = 
pathParams.get("targetDirectory");
-                                               if (targetDirectory == null) {
-                                                       if 
(defaultSavepointDirectory == null) {
-                                                               throw new 
IllegalStateException("No savepoint directory configured. " +
-                                                                       "You 
can either specify a directory when triggering this savepoint or " +
-                                                                       
"configure a cluster-wide default via key '" +
-                                                                       
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
-                                                       } else {
-                                                               targetDirectory 
= defaultSavepointDirectory;
-                                                       }
-                                               }
-
-                                               try {
-                                                       return 
handleNewRequest(jobManagerGateway, jobId, targetDirectory, 
coord.getCheckpointTimeout());
-                                               } catch (IOException e) {
-                                                       throw new 
FlinkFutureException("Could not cancel job with savepoint.", e);
-                                               }
-                                       }, executor);
-                       } else {
-                               return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
-                       }
-               }
-
-               @SuppressWarnings("unchecked")
-               private FullHttpResponse handleNewRequest(JobManagerGateway 
jobManagerGateway, final JobID jobId, String targetDirectory, long 
checkpointTimeout) throws IOException {
-                       // Check whether a request exists
-                       final long requestId;
-                       final boolean isNewRequest;
-                       synchronized (lock) {
-                               if (inProgress.containsKey(jobId)) {
-                                       requestId = inProgress.get(jobId);
-                                       isNewRequest = false;
-                               } else {
-                                       requestId = ++requestCounter;
-                                       inProgress.put(jobId, requestId);
-                                       isNewRequest = true;
-                               }
-                       }
-
-                       if (isNewRequest) {
-                               boolean success = false;
-
-                               try {
-                                       // Trigger cancellation
-                                       CompletableFuture<String> 
cancelJobFuture = jobManagerGateway
-                                               .cancelJobWithSavepoint(jobId, 
targetDirectory, Time.milliseconds(checkpointTimeout));
-
-                                       cancelJobFuture.whenCompleteAsync(
-                                               (String path, Throwable 
throwable) -> {
-                                                       try {
-                                                               if (throwable 
!= null) {
-                                                                       
completed.put(requestId, throwable);
-                                                               } else {
-                                                                       
completed.put(requestId, path);
-                                                               }
-                                                       } finally {
-                                                               
inProgress.remove(jobId);
-                                                       }
-                                               }, executor);
-
-                                       success = true;
-                               } finally {
-                                       synchronized (lock) {
-                                               if (!success) {
-                                                       
inProgress.remove(jobId);
-                                               }
-                                       }
-                               }
-                       }
-
-                       // In-progress location
-                       String location = CANCELLATION_IN_PROGRESS_REST_PATH
-                                       .replace(":jobid", jobId.toString())
-                                       .replace(":requestId", 
Long.toString(requestId));
-
-                       // Accepted response
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-                       gen.writeStartObject();
-                       gen.writeStringField("status", "accepted");
-                       gen.writeNumberField("request-id", requestId);
-                       gen.writeStringField("location", location);
-                       gen.writeEndObject();
-                       gen.close();
-
-                       String json = writer.toString();
-                       byte[] bytes = json.getBytes(ENCODING);
-
-                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
-                                       HttpVersion.HTTP_1_1,
-                                       HttpResponseStatus.ACCEPTED,
-                                       Unpooled.wrappedBuffer(bytes));
-
-                       response.headers().set(HttpHeaders.Names.LOCATION, 
location);
-
-                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
-                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
-
-                       FullHttpResponse accepted = response;
-
-                       return accepted;
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       // In-progress requests
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Handler for in-progress cancel with savepoint operations.
-        */
-       class InProgressHandler implements RequestHandler {
-
-               /** The number of recent checkpoints whose IDs are remembered. 
*/
-               private static final int NUM_GHOST_REQUEST_IDS = 16;
-
-               /** Remember some recently completed. */
-               private final ArrayDeque<Tuple2<Long, Object>> 
recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
-
-               @Override
-               public String[] getPaths() {
-                       return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
-               }
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public CompletableFuture<FullHttpResponse> 
handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, 
JobManagerGateway jobManagerGateway) {
-                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
-                       long requestId = 
Long.parseLong(pathParams.get("requestId"));
-
-                       return CompletableFuture.supplyAsync(
-                               () -> {
-                                       try {
-                                               synchronized (lock) {
-                                                       Object result = 
completed.remove(requestId);
-
-                                                       if (result != null) {
-                                                               // Add to 
recent history
-                                                               
recentlyCompleted.add(new Tuple2<>(requestId, result));
-                                                               if 
(recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
-                                                                       
recentlyCompleted.remove();
-                                                               }
-
-                                                               if 
(result.getClass() == String.class) {
-                                                                       String 
savepointPath = (String) result;
-                                                                       return 
createSuccessResponse(requestId, savepointPath);
-                                                               } else {
-                                                                       
Throwable cause = (Throwable) result;
-                                                                       return 
createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, 
cause.getMessage());
-                                                               }
-                                                       } else {
-                                                               // Check 
in-progress
-                                                               Long 
inProgressRequestId = inProgress.get(jobId);
-                                                               if 
(inProgressRequestId != null) {
-                                                                       // 
Sanity check
-                                                                       if 
(inProgressRequestId == requestId) {
-                                                                               
return createInProgressResponse(requestId);
-                                                                       } else {
-                                                                               
String msg = "Request ID does not belong to JobID";
-                                                                               
return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
-                                                                       }
-                                                               }
-
-                                                               // Check recent 
history
-                                                               for 
(Tuple2<Long, Object> recent : recentlyCompleted) {
-                                                                       if 
(recent.f0 == requestId) {
-                                                                               
if (recent.f1.getClass() == String.class) {
-                                                                               
        String savepointPath = (String) recent.f1;
-                                                                               
        return createSuccessResponse(requestId, savepointPath);
-                                                                               
} else {
-                                                                               
        Throwable cause = (Throwable) recent.f1;
-                                                                               
        return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
requestId, cause.getMessage());
-                                                                               
}
-                                                                       }
-                                                               }
-
-                                                               return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown 
job/request ID");
-                                                       }
-                                               }
-                                       } catch (Exception e) {
-                                               throw new 
FlinkFutureException("Could not handle in progress request.", e);
-                                       }
-                               });
-               }
-
-               private FullHttpResponse createSuccessResponse(long requestId, 
String savepointPath) throws IOException {
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-                       gen.writeStartObject();
-
-                       gen.writeStringField("status", "success");
-                       gen.writeNumberField("request-id", requestId);
-                       gen.writeStringField("savepoint-path", savepointPath);
-
-                       gen.writeEndObject();
-                       gen.close();
-
-                       String json = writer.toString();
-                       byte[] bytes = json.getBytes(ENCODING);
-
-                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
-                                       HttpVersion.HTTP_1_1,
-                                       HttpResponseStatus.CREATED,
-                                       Unpooled.wrappedBuffer(bytes));
-
-                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
-                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
-
-                       return response;
-               }
-
-               private FullHttpResponse createInProgressResponse(long 
requestId) throws IOException {
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-                       gen.writeStartObject();
-
-                       gen.writeStringField("status", "in-progress");
-                       gen.writeNumberField("request-id", requestId);
-
-                       gen.writeEndObject();
-                       gen.close();
-
-                       String json = writer.toString();
-                       byte[] bytes = json.getBytes(ENCODING);
-
-                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
-                                       HttpVersion.HTTP_1_1,
-                                       HttpResponseStatus.ACCEPTED,
-                                       Unpooled.wrappedBuffer(bytes));
-
-                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
-                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
-
-                       return response;
-               }
-
-               private FullHttpResponse 
createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) 
throws IOException {
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-                       gen.writeStartObject();
-
-                       gen.writeStringField("status", "failed");
-                       gen.writeNumberField("request-id", requestId);
-                       gen.writeStringField("cause", errMsg);
-
-                       gen.writeEndObject();
-                       gen.close();
-
-                       String json = writer.toString();
-                       byte[] bytes = json.getBytes(ENCODING);
-
-                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
-                                       HttpVersion.HTTP_1_1,
-                                       code,
-                                       Unpooled.wrappedBuffer(bytes));
-
-                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
-                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
-
-                       return response;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
deleted file mode 100644
index 0b15b37..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.ArchivedExecutionConfig;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the execution config of a job.
- */
-public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
-
-       private static final String JOB_CONFIG_REST_PATH = 
"/jobs/:jobid/config";
-
-       public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_CONFIG_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return createJobConfigJson(graph);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not write job config json.", e);
-                               }
-                       },
-                       executor);
-
-       }
-
-       /**
-        * Archivist for the JobConfigHandler.
-        */
-       public static class JobConfigJsonArchivist implements JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String json = createJobConfigJson(graph);
-                       String path = JOB_CONFIG_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       return Collections.singletonList(new ArchivedJson(path, 
json));
-               }
-       }
-
-       public static String createJobConfigJson(AccessExecutionGraph graph) 
throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-               gen.writeStringField("jid", graph.getJobID().toString());
-               gen.writeStringField("name", graph.getJobName());
-
-               final ArchivedExecutionConfig summary = 
graph.getArchivedExecutionConfig();
-
-               if (summary != null) {
-                       gen.writeObjectFieldStart("execution-config");
-
-                       gen.writeStringField("execution-mode", 
summary.getExecutionMode());
-
-                       gen.writeStringField("restart-strategy", 
summary.getRestartStrategyDescription());
-                       gen.writeNumberField("job-parallelism", 
summary.getParallelism());
-                       gen.writeBooleanField("object-reuse-mode", 
summary.getObjectReuseEnabled());
-
-                       Map<String, String> ucVals = 
summary.getGlobalJobParameters();
-                       if (ucVals != null) {
-                               gen.writeObjectFieldStart("user-config");
-
-                               for (Map.Entry<String, String> ucVal : 
ucVals.entrySet()) {
-                                       gen.writeStringField(ucVal.getKey(), 
ucVal.getValue());
-                               }
-
-                               gen.writeEndObject();
-                       }
-
-                       gen.writeEndObject();
-               }
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
deleted file mode 100644
index 8a50f87..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns details about a job. This includes:
- * <ul>
- *     <li>Dataflow plan</li>
- *     <li>id, name, and current status</li>
- *     <li>start time, end time, duration</li>
- *     <li>number of job vertices in each state (pending, running, finished, 
failed)</li>
- *     <li>info about job vertices, including runtime, status, I/O bytes and 
records, subtasks in each status</li>
- * </ul>
- */
-public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
-
-       private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
-       private static final String JOB_DETAILS_VERTICES_REST_PATH = 
"/jobs/:jobid/vertices";
-
-       private final MetricFetcher fetcher;
-
-       public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor, MetricFetcher fetcher) {
-               super(executionGraphHolder, executor);
-               this.fetcher = fetcher;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_DETAILS_REST_PATH, 
JOB_DETAILS_VERTICES_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return createJobDetailsJson(graph, 
fetcher);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create job details json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the JobDetailsHandler.
-        */
-       public static class JobDetailsJsonArchivist implements JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String json = createJobDetailsJson(graph, null);
-                       String path1 = JOB_DETAILS_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       String path2 = JOB_DETAILS_VERTICES_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       Collection<ArchivedJson> archives = new ArrayList<>();
-                       archives.add(new ArchivedJson(path1, json));
-                       archives.add(new ArchivedJson(path2, json));
-                       return archives;
-               }
-       }
-
-       public static String createJobDetailsJson(AccessExecutionGraph graph, 
@Nullable MetricFetcher fetcher) throws IOException {
-               final StringWriter writer = new StringWriter();
-               final JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               final long now = System.currentTimeMillis();
-
-               gen.writeStartObject();
-
-               // basic info
-               gen.writeStringField("jid", graph.getJobID().toString());
-               gen.writeStringField("name", graph.getJobName());
-               gen.writeBooleanField("isStoppable", graph.isStoppable());
-               gen.writeStringField("state", graph.getState().name());
-
-               // times and duration
-               final long jobStartTime = 
graph.getStatusTimestamp(JobStatus.CREATED);
-               final long jobEndTime = 
graph.getState().isGloballyTerminalState() ?
-                               graph.getStatusTimestamp(graph.getState()) : 
-1L;
-               gen.writeNumberField("start-time", jobStartTime);
-               gen.writeNumberField("end-time", jobEndTime);
-               gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : 
now) - jobStartTime);
-               gen.writeNumberField("now", now);
-
-               // timestamps
-               gen.writeObjectFieldStart("timestamps");
-               for (JobStatus status : JobStatus.values()) {
-                       gen.writeNumberField(status.name(), 
graph.getStatusTimestamp(status));
-               }
-               gen.writeEndObject();
-
-               // job vertices
-               int[] jobVerticesPerState = new 
int[ExecutionState.values().length];
-               gen.writeArrayFieldStart("vertices");
-
-               for (AccessExecutionJobVertex ejv : 
graph.getVerticesTopologically()) {
-                       int[] tasksPerState = new 
int[ExecutionState.values().length];
-                       long startTime = Long.MAX_VALUE;
-                       long endTime = 0;
-                       boolean allFinished = true;
-
-                       for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
-                               final ExecutionState state = 
vertex.getExecutionState();
-                               tasksPerState[state.ordinal()]++;
-
-                               // take the earliest start time
-                               long started = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
-                               if (started > 0) {
-                                       startTime = Math.min(startTime, 
started);
-                               }
-
-                               allFinished &= state.isTerminal();
-                               endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
-                       }
-
-                       long duration;
-                       if (startTime < Long.MAX_VALUE) {
-                               if (allFinished) {
-                                       duration = endTime - startTime;
-                               }
-                               else {
-                                       endTime = -1L;
-                                       duration = now - startTime;
-                               }
-                       }
-                       else {
-                               startTime = -1L;
-                               endTime = -1L;
-                               duration = -1L;
-                       }
-
-                       ExecutionState jobVertexState =
-                                       
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
ejv.getParallelism());
-                       jobVerticesPerState[jobVertexState.ordinal()]++;
-
-                       gen.writeStartObject();
-                       gen.writeStringField("id", 
ejv.getJobVertexId().toString());
-                       gen.writeStringField("name", ejv.getName());
-                       gen.writeNumberField("parallelism", 
ejv.getParallelism());
-                       gen.writeStringField("status", jobVertexState.name());
-
-                       gen.writeNumberField("start-time", startTime);
-                       gen.writeNumberField("end-time", endTime);
-                       gen.writeNumberField("duration", duration);
-
-                       gen.writeObjectFieldStart("tasks");
-                       for (ExecutionState state : ExecutionState.values()) {
-                               gen.writeNumberField(state.name(), 
tasksPerState[state.ordinal()]);
-                       }
-                       gen.writeEndObject();
-
-                       MutableIOMetrics counts = new MutableIOMetrics();
-
-                       for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
-                               counts.addIOMetrics(
-                                       vertex.getCurrentExecutionAttempt(),
-                                       fetcher,
-                                       graph.getJobID().toString(),
-                                       ejv.getJobVertexId().toString());
-                       }
-
-                       counts.writeIOMetricsAsJson(gen);
-
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-
-               gen.writeObjectFieldStart("status-counts");
-               for (ExecutionState state : ExecutionState.values()) {
-                       gen.writeNumberField(state.name(), 
jobVerticesPerState[state.ordinal()]);
-               }
-               gen.writeEndObject();
-
-               gen.writeFieldName("plan");
-               gen.writeRawValue(graph.getJsonPlan());
-
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
deleted file mode 100644
index 6ffd443..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ErrorInfo;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.util.ExceptionUtils;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the configuration of a job.
- */
-public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler 
{
-
-       private static final String JOB_EXCEPTIONS_REST_PATH = 
"/jobs/:jobid/exceptions";
-
-       static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
-
-       public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_EXCEPTIONS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return createJobExceptionsJson(graph);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create job exceptions json.", e);
-                               }
-                       },
-                       executor
-               );
-       }
-
-       /**
-        * Archivist for the JobExceptionsHandler.
-        */
-       public static class JobExceptionsJsonArchivist implements JsonArchivist 
{
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String json = createJobExceptionsJson(graph);
-                       String path = JOB_EXCEPTIONS_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       return Collections.singletonList(new ArchivedJson(path, 
json));
-               }
-       }
-
-       public static String createJobExceptionsJson(AccessExecutionGraph 
graph) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-
-               // most important is the root failure cause
-               ErrorInfo rootException = graph.getFailureCause();
-               if (rootException != null && 
!rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
-                       gen.writeStringField("root-exception", 
rootException.getExceptionAsString());
-                       gen.writeNumberField("timestamp", 
rootException.getTimestamp());
-               }
-
-               // we additionally collect all exceptions (up to a limit) that 
occurred in the individual tasks
-               gen.writeArrayFieldStart("all-exceptions");
-
-               int numExceptionsSoFar = 0;
-               boolean truncated = false;
-
-               for (AccessExecutionVertex task : 
graph.getAllExecutionVertices()) {
-                       String t = task.getFailureCauseAsString();
-                       if (t != null && 
!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
-                               if (numExceptionsSoFar >= 
MAX_NUMBER_EXCEPTION_TO_REPORT) {
-                                       truncated = true;
-                                       break;
-                               }
-
-                               TaskManagerLocation location = 
task.getCurrentAssignedResourceLocation();
-                               String locationString = location != null ?
-                                               location.getFQDNHostname() + 
':' + location.dataPort() : "(unassigned)";
-
-                               gen.writeStartObject();
-                               gen.writeStringField("exception", t);
-                               gen.writeStringField("task", 
task.getTaskNameWithSubtaskIndex());
-                               gen.writeStringField("location", 
locationString);
-                               long timestamp = 
task.getStateTimestamp(ExecutionState.FAILED);
-                               gen.writeNumberField("timestamp", timestamp == 
0 ? -1 : timestamp);
-                               gen.writeEndObject();
-                               numExceptionsSoFar++;
-                       }
-               }
-               gen.writeEndArray();
-
-               gen.writeBooleanField("truncated", truncated);
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

Reply via email to