http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
new file mode 100644
index 0000000..58fda14
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/WebHandler.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler;
+
+/**
+ * Marker interface for web handlers which can describe their paths.
+ */
+public interface WebHandler {
+
+       /**
+        * Returns an array of REST URL's under which this handler can be 
registered.
+        *
+        * @return array containing REST URL's under which this handler can be 
registered.
+        */
+       String[] getPaths();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
new file mode 100644
index 0000000..e214a36
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on an ExecutionGraph
+ * that can be retrieved via "jobid" parameter.
+ */
+public abstract class AbstractExecutionGraphRequestHandler extends 
AbstractJsonRequestHandler {
+
+       private final ExecutionGraphHolder executionGraphHolder;
+
+       public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executor);
+               this.executionGraphHolder = 
Preconditions.checkNotNull(executionGraphHolder);
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(
+                       Map<String, String> pathParams,
+                       Map<String, String> queryParams,
+                       JobManagerGateway jobManagerGateway) {
+               String jidString = pathParams.get("jobid");
+               if (jidString == null) {
+                       throw new RuntimeException("JobId parameter missing");
+               }
+
+               JobID jid;
+               try {
+                       jid = JobID.fromHexString(jidString);
+               }
+               catch (Exception e) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Invalid JobID string '" + jidString + "'", e));
+               }
+
+               final CompletableFuture<Optional<AccessExecutionGraph>> 
graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway);
+
+               return graphFuture.thenComposeAsync(
+                       (Optional<AccessExecutionGraph> optGraph) -> {
+                               if (optGraph.isPresent()) {
+                                       return handleRequest(optGraph.get(), 
pathParams);
+                               } else {
+                                       throw new FlinkFutureException(new 
NotFoundException("Could not find job with jobId " + jid + '.'));
+                               }
+                       }, executor);
+       }
+
+       public abstract CompletableFuture<String> 
handleRequest(AccessExecutionGraph graph, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
new file mode 100644
index 0000000..e2e4484
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific job 
vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).
+ */
+public abstract class AbstractJobVertexRequestHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       public AbstractJobVertexRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public final CompletableFuture<String> 
handleRequest(AccessExecutionGraph graph, Map<String, String> params) {
+               final JobVertexID vid = parseJobVertexId(params);
+
+               final AccessExecutionJobVertex jobVertex = 
graph.getJobVertex(vid);
+               if (jobVertex == null) {
+                       throw new IllegalArgumentException("No vertex with ID 
'" + vid + "' exists.");
+               }
+
+               return handleRequest(jobVertex, params);
+       }
+
+       /**
+        * Returns the job vertex ID parsed from the provided parameters.
+        *
+        * @param params Path parameters
+        * @return Parsed job vertex ID or <code>null</code> if not available.
+        */
+       public static JobVertexID parseJobVertexId(Map<String, String> params) {
+               String jobVertexIdParam = params.get("vertexid");
+               if (jobVertexIdParam == null) {
+                       return null;
+               }
+
+               try {
+                       return JobVertexID.fromHexString(jobVertexIdParam);
+               } catch (RuntimeException ignored) {
+                       return null;
+               }
+       }
+
+       public abstract CompletableFuture<String> 
handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
new file mode 100644
index 0000000..43c4af3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJsonRequestHandler.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.NotFoundException;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import java.nio.charset.Charset;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for most request handlers. The handlers must produce a JSON 
response.
+ */
+public abstract class AbstractJsonRequestHandler implements RequestHandler {
+
+       private static final Charset ENCODING = Charset.forName("UTF-8");
+
+       protected final Executor executor;
+
+       protected AbstractJsonRequestHandler(Executor executor) {
+               this.executor = Preconditions.checkNotNull(executor);
+       }
+
+       @Override
+       public CompletableFuture<FullHttpResponse> handleRequest(Map<String, 
String> pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               CompletableFuture<String> resultFuture = 
handleJsonRequest(pathParams, queryParams, jobManagerGateway);
+
+               return resultFuture.thenApplyAsync(
+                       (String result) -> {
+                               byte[] bytes = result.getBytes(ENCODING);
+
+                               DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1, 
HttpResponseStatus.OK, Unpooled.wrappedBuffer(bytes));
+
+                               
response.headers().set(HttpHeaders.Names.CONTENT_TYPE, "application/json; 
charset=" + ENCODING.name());
+                               
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                               return response;
+                       });
+       }
+
+       /**
+        * Core method that handles the request and generates the response. The 
method needs to
+        * respond with a valid JSON string. Exceptions may be thrown and will 
be handled.
+        *
+        * @param pathParams The map of REST path parameters, decoded by the 
router.
+        * @param queryParams The map of query parameters.
+        * @param jobManagerGateway to communicate with the JobManager.
+        *
+        * @return The JSON string that is the HTTP response.
+        *
+        * @throws Exception Handlers may forward exceptions. Exceptions of type
+        *         {@link NotFoundException} will cause a HTTP 404
+        *         response with the exception message, other exceptions will 
cause a HTTP 500 response
+        *         with the exception stack trace.
+        */
+       public abstract CompletableFuture<String> handleJsonRequest(
+                       Map<String, String> pathParams,
+                       Map<String, String> queryParams,
+                       JobManagerGateway jobManagerGateway);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
new file mode 100644
index 0000000..ec277d8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific 
subtask execution attempt
+ * (defined via the "attempt" parameter) of a specific subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the 
"vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractSubtaskAttemptRequestHandler extends 
AbstractSubtaskRequestHandler {
+
+       public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionVertex 
vertex, Map<String, String> params) {
+               final String attemptNumberString = params.get("attempt");
+               if (attemptNumberString == null) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Attempt number parameter missing"));
+               }
+
+               final int attempt;
+               try {
+                       attempt = Integer.parseInt(attemptNumberString);
+               }
+               catch (NumberFormatException e) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Invalid attempt number parameter"));
+               }
+
+               final AccessExecution currentAttempt = 
vertex.getCurrentExecutionAttempt();
+               if (attempt == currentAttempt.getAttemptNumber()) {
+                       return handleRequest(currentAttempt, params);
+               }
+               else if (attempt >= 0 && attempt < 
currentAttempt.getAttemptNumber()) {
+                       AccessExecution exec = 
vertex.getPriorExecutionAttempt(attempt);
+
+                       if (exec != null) {
+                               return handleRequest(exec, params);
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
RequestHandlerException("Execution for attempt " + attempt +
+                                       " has already been deleted."));
+                       }
+               }
+               else {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Attempt does not exist: " + attempt));
+               }
+       }
+
+       public abstract CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
new file mode 100644
index 0000000..d69038a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.util.FlinkException;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific 
subtask (defined via the
+ * "subtasknum" parameter) in a specific job vertex (defined via the 
"vertexid" parameter) in a
+ * specific job, defined via (defined voa the "jobid" parameter).
+ */
+public abstract class AbstractSubtaskRequestHandler extends 
AbstractJobVertexRequestHandler {
+
+       public AbstractSubtaskRequestHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public final CompletableFuture<String> 
handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) {
+               final String subtaskNumberString = params.get("subtasknum");
+               if (subtaskNumberString == null) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Subtask number parameter missing"));
+               }
+
+               final int subtask;
+               try {
+                       subtask = Integer.parseInt(subtaskNumberString);
+               }
+               catch (NumberFormatException e) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Invalid subtask number parameter", e));
+               }
+
+               if (subtask < 0 || subtask >= jobVertex.getParallelism()) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("subtask does not exist: " + subtask));
+               }
+
+               final AccessExecutionVertex vertex = 
jobVertex.getTaskVertices()[subtask];
+               return handleRequest(vertex, params);
+       }
+
+       public abstract CompletableFuture<String> 
handleRequest(AccessExecutionVertex vertex, Map<String, String> params);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
new file mode 100644
index 0000000..db13633
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ClusterOverviewHandler.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.StatusOverview;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.util.FlinkException;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Responder that returns the status of the Flink cluster, such as how many
+ * TaskManagers are currently connected, and how many jobs are running.
+ */
+public class ClusterOverviewHandler extends AbstractJsonRequestHandler {
+
+       private static final String CLUSTER_OVERVIEW_REST_PATH = "/overview";
+
+       private static final String version = 
EnvironmentInformation.getVersion();
+
+       private static final String commitID = 
EnvironmentInformation.getRevisionInformation().commitId;
+
+       private final Time timeout;
+
+       public ClusterOverviewHandler(Executor executor, Time timeout) {
+               super(executor);
+               this.timeout = checkNotNull(timeout);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{CLUSTER_OVERVIEW_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               // we need no parameters, get all requests
+               try {
+                       if (jobManagerGateway != null) {
+                               CompletableFuture<StatusOverview> 
overviewFuture = jobManagerGateway.requestStatusOverview(timeout);
+
+                               return overviewFuture.thenApplyAsync(
+                                       (StatusOverview overview) -> {
+                                               StringWriter writer = new 
StringWriter();
+                                               try {
+                                                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+                                                       gen.writeStartObject();
+                                                       
gen.writeNumberField("taskmanagers", overview.getNumTaskManagersConnected());
+                                                       
gen.writeNumberField("slots-total", overview.getNumSlotsTotal());
+                                                       
gen.writeNumberField("slots-available", overview.getNumSlotsAvailable());
+                                                       
gen.writeNumberField("jobs-running", overview.getNumJobsRunningOrPending());
+                                                       
gen.writeNumberField("jobs-finished", overview.getNumJobsFinished());
+                                                       
gen.writeNumberField("jobs-cancelled", overview.getNumJobsCancelled());
+                                                       
gen.writeNumberField("jobs-failed", overview.getNumJobsFailed());
+                                                       
gen.writeStringField("flink-version", version);
+                                                       if 
(!commitID.equals(EnvironmentInformation.UNKNOWN)) {
+                                                               
gen.writeStringField("flink-commit", commitID);
+                                                       }
+                                                       gen.writeEndObject();
+
+                                                       gen.close();
+                                                       return 
writer.toString();
+                                               } catch (IOException exception) 
{
+                                                       throw new 
FlinkFutureException("Could not write cluster overview.", exception);
+                                               }
+                                       },
+                                       executor);
+                       } else {
+                               throw new Exception("No connection to the 
leading JobManager.");
+                       }
+               }
+               catch (Exception e) {
+                       return FutureUtils.completedExceptionally(new 
FlinkException("Failed to fetch list of all running jobs: ", e));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
new file mode 100644
index 0000000..57214f0
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ConstantTextHandler.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.configuration.ConfigConstants;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+/**
+ * Responder that returns a constant String.
+ */
[email protected]
+public class ConstantTextHandler extends SimpleChannelInboundHandler<Routed> {
+
+       private final byte[] encodedText;
+
+       public ConstantTextHandler(String text) {
+               this.encodedText = 
text.getBytes(ConfigConstants.DEFAULT_CHARSET);
+       }
+
+       @Override
+       protected void channelRead0(ChannelHandlerContext ctx, Routed routed) 
throws Exception {
+               HttpResponse response = new DefaultFullHttpResponse(
+                       HttpVersion.HTTP_1_1, HttpResponseStatus.OK, 
Unpooled.wrappedBuffer(encodedText));
+
+               response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
encodedText.length);
+               response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"text/plain");
+
+               KeepAliveWrite.flush(ctx, routed.request(), response);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
new file mode 100644
index 0000000..07d9707
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobIdsHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Responder that returns with a list of all JobIDs of jobs found at the 
target actor.
+ * May serve the IDs of current jobs, or past jobs, depending on whether this 
handler is
+ * given the JobManager or Archive Actor Reference.
+ */
+public class CurrentJobIdsHandler extends AbstractJsonRequestHandler {
+
+       private static final String CURRENT_JOB_IDS_REST_PATH = "/jobs";
+
+       private final Time timeout;
+
+       public CurrentJobIdsHandler(Executor executor, Time timeout) {
+               super(executor);
+               this.timeout = requireNonNull(timeout);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{CURRENT_JOB_IDS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               // we need no parameters, get all requests
+                               try {
+                                       if (jobManagerGateway != null) {
+                                               
CompletableFuture<JobsWithIDsOverview> overviewFuture = 
jobManagerGateway.requestJobsOverview(timeout);
+                                               JobsWithIDsOverview overview = 
overviewFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
+
+                                               StringWriter writer = new 
StringWriter();
+                                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+                                               gen.writeStartObject();
+
+                                               
gen.writeArrayFieldStart("jobs-running");
+                                               for (JobID jid : 
overview.getJobsRunningOrPending()) {
+                                                       
gen.writeString(jid.toString());
+                                               }
+                                               gen.writeEndArray();
+
+                                               
gen.writeArrayFieldStart("jobs-finished");
+                                               for (JobID jid : 
overview.getJobsFinished()) {
+                                                       
gen.writeString(jid.toString());
+                                               }
+                                               gen.writeEndArray();
+
+                                               
gen.writeArrayFieldStart("jobs-cancelled");
+                                               for (JobID jid : 
overview.getJobsCancelled()) {
+                                                       
gen.writeString(jid.toString());
+                                               }
+                                               gen.writeEndArray();
+
+                                               
gen.writeArrayFieldStart("jobs-failed");
+                                               for (JobID jid : 
overview.getJobsFailed()) {
+                                                       
gen.writeString(jid.toString());
+                                               }
+                                               gen.writeEndArray();
+
+                                               gen.writeEndObject();
+
+                                               gen.close();
+                                               return writer.toString();
+                                       }
+                                       else {
+                                               throw new Exception("No 
connection to the leading JobManager.");
+                                       }
+                               }
+                               catch (Exception e) {
+                                       throw new FlinkFutureException("Failed 
to fetch list of all running jobs.", e);
+                               }
+                       },
+                       executor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
new file mode 100644
index 0000000..6f85320
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns a summary of the job status.
+ */
+public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler {
+
+       private static final String ALL_JOBS_REST_PATH = "/joboverview";
+       private static final String RUNNING_JOBS_REST_PATH = 
"/joboverview/running";
+       private static final String COMPLETED_JOBS_REST_PATH = 
"/joboverview/completed";
+
+       private final Time timeout;
+
+       private final boolean includeRunningJobs;
+       private final boolean includeFinishedJobs;
+
+       public CurrentJobsOverviewHandler(
+                       Executor executor,
+                       Time timeout,
+                       boolean includeRunningJobs,
+                       boolean includeFinishedJobs) {
+
+               super(executor);
+               this.timeout = checkNotNull(timeout);
+               this.includeRunningJobs = includeRunningJobs;
+               this.includeFinishedJobs = includeFinishedJobs;
+       }
+
+       @Override
+       public String[] getPaths() {
+               if (includeRunningJobs && includeFinishedJobs) {
+                       return new String[]{ALL_JOBS_REST_PATH};
+               }
+               if (includeRunningJobs) {
+                       return new String[]{RUNNING_JOBS_REST_PATH};
+               } else {
+                       return new String[]{COMPLETED_JOBS_REST_PATH};
+               }
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               if (jobManagerGateway != null) {
+                       CompletableFuture<MultipleJobsDetails> jobDetailsFuture 
= jobManagerGateway.requestJobDetails(includeRunningJobs, includeFinishedJobs, 
timeout);
+
+                       return jobDetailsFuture.thenApplyAsync(
+                               (MultipleJobsDetails result) -> {
+                                       final long now = 
System.currentTimeMillis();
+
+                                       StringWriter writer = new 
StringWriter();
+                                       try {
+                                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+                                               gen.writeStartObject();
+
+                                               if (includeRunningJobs && 
includeFinishedJobs) {
+                                                       
gen.writeArrayFieldStart("running");
+                                                       for (JobDetails detail 
: result.getRunningJobs()) {
+                                                               
writeJobDetailOverviewAsJson(detail, gen, now);
+                                                       }
+                                                       gen.writeEndArray();
+
+                                                       
gen.writeArrayFieldStart("finished");
+                                                       for (JobDetails detail 
: result.getFinishedJobs()) {
+                                                               
writeJobDetailOverviewAsJson(detail, gen, now);
+                                                       }
+                                                       gen.writeEndArray();
+                                               } else {
+                                                       
gen.writeArrayFieldStart("jobs");
+                                                       for (JobDetails detail 
: includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
+                                                               
writeJobDetailOverviewAsJson(detail, gen, now);
+                                                       }
+                                                       gen.writeEndArray();
+                                               }
+
+                                               gen.writeEndObject();
+                                               gen.close();
+                                               return writer.toString();
+                                       } catch (IOException e) {
+                                               throw new 
FlinkFutureException("Could not write current jobs overview json.", e);
+                                       }
+                               },
+                               executor);
+               }
+               else {
+                       return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
+               }
+       }
+
+       /**
+        * Archivist for the CurrentJobsOverviewHandler.
+        */
+       public static class CurrentJobsOverviewJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       StringWriter writer = new StringWriter();
+                       try (JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
+                               gen.writeStartObject();
+                               gen.writeArrayFieldStart("running");
+                               gen.writeEndArray();
+                               gen.writeArrayFieldStart("finished");
+                               
writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, 
System.currentTimeMillis());
+                               gen.writeEndArray();
+                               gen.writeEndObject();
+                       }
+                       String json = writer.toString();
+                       String path = ALL_JOBS_REST_PATH;
+                       return Collections.singleton(new ArchivedJson(path, 
json));
+               }
+       }
+
+       public static void writeJobDetailOverviewAsJson(JobDetails details, 
JsonGenerator gen, long now) throws IOException {
+               gen.writeStartObject();
+
+               gen.writeStringField("jid", details.getJobId().toString());
+               gen.writeStringField("name", details.getJobName());
+               gen.writeStringField("state", details.getStatus().name());
+
+               gen.writeNumberField("start-time", details.getStartTime());
+               gen.writeNumberField("end-time", details.getEndTime());
+               gen.writeNumberField("duration", (details.getEndTime() <= 0 ? 
now : details.getEndTime()) - details.getStartTime());
+               gen.writeNumberField("last-modification", 
details.getLastUpdateTime());
+
+               gen.writeObjectFieldStart("tasks");
+               gen.writeNumberField("total", details.getNumTasks());
+
+               final int[] perState = 
details.getNumVerticesPerExecutionState();
+               gen.writeNumberField("pending", 
perState[ExecutionState.CREATED.ordinal()] +
+                               perState[ExecutionState.SCHEDULED.ordinal()] +
+                               perState[ExecutionState.DEPLOYING.ordinal()]);
+               gen.writeNumberField("running", 
perState[ExecutionState.RUNNING.ordinal()]);
+               gen.writeNumberField("finished", 
perState[ExecutionState.FINISHED.ordinal()]);
+               gen.writeNumberField("canceling", 
perState[ExecutionState.CANCELING.ordinal()]);
+               gen.writeNumberField("canceled", 
perState[ExecutionState.CANCELED.ordinal()]);
+               gen.writeNumberField("failed", 
perState[ExecutionState.FAILED.ordinal()]);
+               gen.writeEndObject();
+
+               gen.writeEndObject();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
new file mode 100644
index 0000000..e8854f4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/DashboardConfigHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Responder that returns the parameters that define how the asynchronous 
requests
+ * against this web server should behave. It defines for example the refresh 
interval,
+ * and time zone of the server timestamps.
+ */
+public class DashboardConfigHandler extends AbstractJsonRequestHandler {
+
+       private static final String DASHBOARD_CONFIG_REST_PATH = "/config";
+
+       private final String configString;
+
+       public DashboardConfigHandler(Executor executor, long refreshInterval) {
+               super(executor);
+               try {
+                       this.configString = createConfigJson(refreshInterval);
+               }
+               catch (Exception e) {
+                       // should never happen
+                       throw new RuntimeException(e.getMessage(), e);
+               }
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{DASHBOARD_CONFIG_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.completedFuture(configString);
+       }
+
+       public static String createConfigJson(long refreshInterval) throws 
IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               TimeZone timeZone = TimeZone.getDefault();
+               String timeZoneName = timeZone.getDisplayName();
+               long timeZoneOffset = timeZone.getRawOffset();
+
+               gen.writeStartObject();
+               gen.writeNumberField("refresh-interval", refreshInterval);
+               gen.writeNumberField("timezone-offset", timeZoneOffset);
+               gen.writeStringField("timezone-name", timeZoneName);
+               gen.writeStringField("flink-version", 
EnvironmentInformation.getVersion());
+
+               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
+               if (revision != null) {
+                       gen.writeStringField("flink-revision", 
revision.commitId + " @ " + revision.commitDate);
+               }
+
+               gen.writeEndObject();
+
+               gen.close();
+
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
new file mode 100644
index 0000000..8a47e50
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.WeakHashMap;
+import java.util.concurrent.CompletableFuture;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Gateway to obtaining an {@link ExecutionGraph} from a source, like 
JobManager or Archive.
+ *
+ * <p>The holder will cache the ExecutionGraph behind a weak reference, which 
will be cleared
+ * at some point once no one else is pointing to the ExecutionGraph.
+ * Note that while the holder runs in the same JVM as the JobManager or 
Archive, the reference should
+ * stay valid.
+ */
+public class ExecutionGraphHolder {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(ExecutionGraphHolder.class);
+
+       private final Time timeout;
+
+       private final WeakHashMap<JobID, AccessExecutionGraph> cache = new 
WeakHashMap<>();
+
+       public ExecutionGraphHolder(Time timeout) {
+               this.timeout = checkNotNull(timeout);
+       }
+
+       /**
+        * Retrieves the execution graph with {@link JobID} jid wrapped in 
{@link Optional} or
+        * {@link Optional#empty()} if it cannot be found.
+        *
+        * @param jid jobID of the execution graph to be retrieved
+        * @return Optional ExecutionGraph if it has been retrievable, empty if 
there has been no ExecutionGraph
+        */
+       public CompletableFuture<Optional<AccessExecutionGraph>> 
getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) {
+               AccessExecutionGraph cached = cache.get(jid);
+               if (cached != null) {
+                       if (cached.getState() == JobStatus.SUSPENDED) {
+                               cache.remove(jid);
+                       } else {
+                               return 
CompletableFuture.completedFuture(Optional.of(cached));
+                       }
+               }
+
+               CompletableFuture<Optional<AccessExecutionGraph>> 
executionGraphFuture = jobManagerGateway.requestJob(jid, timeout);
+
+               executionGraphFuture.thenAcceptAsync(
+                       optExecutionGraph ->
+                               optExecutionGraph.ifPresent(executionGraph -> 
cache.put(jid, executionGraph)));
+
+               return executionGraphFuture;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
new file mode 100644
index 0000000..0a3b050
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the aggregated user accumulators of a job.
+ */
+public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       private static final String JOB_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/accumulators";
+
+       public JobAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobAccumulatorsJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job accumulators json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the JobAccumulatorsHandler.
+        */
+       public static class JobAccumulatorsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobAccumulatorsJson(graph);
+                       String path = JOB_ACCUMULATORS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
+       public static String createJobAccumulatorsJson(AccessExecutionGraph 
graph) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
+
+               gen.writeStartObject();
+
+               gen.writeArrayFieldStart("job-accumulators");
+               // empty for now
+               gen.writeEndArray();
+
+               gen.writeArrayFieldStart("user-task-accumulators");
+               for (StringifiedAccumulatorResult acc : allAccumulators) {
+                       gen.writeStartObject();
+                       gen.writeStringField("name", acc.getName());
+                       gen.writeStringField("type", acc.getType());
+                       gen.writeStringField("value", acc.getValue());
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
new file mode 100644
index 0000000..a194f30
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the CANCEL request.
+ */
+public class JobCancellationHandler extends AbstractJsonRequestHandler {
+
+       private static final String JOB_CONCELLATION_REST_PATH = 
"/jobs/:jobid/cancel";
+       private static final String JOB_CONCELLATION_YARN_REST_PATH = 
"/jobs/:jobid/yarn-cancel";
+
+       private final Time timeout;
+
+       public JobCancellationHandler(Executor executor, Time timeout) {
+               super(executor);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_CONCELLATION_REST_PATH, 
JOB_CONCELLATION_YARN_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                                       if (jobManagerGateway != null) {
+                                               
jobManagerGateway.cancelJob(jobId, timeout);
+                                               return "{}";
+                                       }
+                                       else {
+                                               throw new Exception("No 
connection to the leading JobManager.");
+                                       }
+                               }
+                               catch (Exception e) {
+                                       throw new FlinkFutureException("Failed 
to cancel the job with id: "  + pathParams.get("jobid"), e);
+                               }
+                       },
+                       executor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
new file mode 100644
index 0000000..23e94f5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJobWithSavepoint;
+import org.apache.flink.runtime.rest.NotFoundException;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.nio.charset.Charset;
+import java.util.ArrayDeque;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler for {@link CancelJobWithSavepoint} messages.
+ */
+public class JobCancellationWithSavepointHandlers {
+
+       private static final String CANCEL_WITH_SAVEPOINT_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint";
+       private static final String CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint/target-directory/:targetDirectory";
+
+       /** URL for in-progress cancellations. */
+       private static final String CANCELLATION_IN_PROGRESS_REST_PATH = 
"/jobs/:jobid/cancel-with-savepoint/in-progress/:requestId";
+
+       /** Encodings for String. */
+       private static final Charset ENCODING = ConfigConstants.DEFAULT_CHARSET;
+
+       /** Shared lock between Trigger and In-Progress handlers. */
+       private final Object lock = new Object();
+
+       /** In-Progress requests. */
+       private final Map<JobID, Long> inProgress = new HashMap<>();
+
+       /** Succeeded/failed request. Either String or Throwable. */
+       private final Map<Long, Object> completed = new HashMap<>();
+
+       /** Atomic request counter. */
+       private long requestCounter;
+
+       /** Handler for trigger requests. */
+       private final TriggerHandler triggerHandler;
+
+       /** Handler for in-progress requests. */
+       private final InProgressHandler inProgressHandler;
+
+       /** Default savepoint directory. */
+       private final String defaultSavepointDirectory;
+
+       public JobCancellationWithSavepointHandlers(
+                       ExecutionGraphHolder currentGraphs,
+                       Executor executor) {
+               this(currentGraphs, executor, null);
+       }
+
+       public JobCancellationWithSavepointHandlers(
+                       ExecutionGraphHolder currentGraphs,
+                       Executor executor,
+                       @Nullable String defaultSavepointDirectory) {
+
+               this.triggerHandler = new TriggerHandler(currentGraphs, 
executor);
+               this.inProgressHandler = new InProgressHandler();
+               this.defaultSavepointDirectory = defaultSavepointDirectory;
+       }
+
+       public TriggerHandler getTriggerHandler() {
+               return triggerHandler;
+       }
+
+       public InProgressHandler getInProgressHandler() {
+               return inProgressHandler;
+       }
+
+       // 
------------------------------------------------------------------------
+       // New requests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Handler for triggering a {@link CancelJobWithSavepoint} message.
+        */
+       class TriggerHandler implements RequestHandler {
+
+               /** Current execution graphs. */
+               private final ExecutionGraphHolder currentGraphs;
+
+               /** Execution context for futures. */
+               private final Executor executor;
+
+               public TriggerHandler(ExecutionGraphHolder currentGraphs, 
Executor executor) {
+                       this.currentGraphs = checkNotNull(currentGraphs);
+                       this.executor = checkNotNull(executor);
+               }
+
+               @Override
+               public String[] getPaths() {
+                       return new String[]{CANCEL_WITH_SAVEPOINT_REST_PATH, 
CANCEL_WITH_SAVEPOINT_DIRECTORY_REST_PATH};
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public CompletableFuture<FullHttpResponse> handleRequest(
+                               Map<String, String> pathParams,
+                               Map<String, String> queryParams,
+                               JobManagerGateway jobManagerGateway) {
+
+                       if (jobManagerGateway != null) {
+                               JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+                               final 
CompletableFuture<Optional<AccessExecutionGraph>> graphFuture;
+
+                               graphFuture = 
currentGraphs.getExecutionGraph(jobId, jobManagerGateway);
+
+                               return graphFuture.thenApplyAsync(
+                                       (Optional<AccessExecutionGraph> 
optGraph) -> {
+                                               final AccessExecutionGraph 
graph = optGraph.orElseThrow(
+                                                       () -> new 
FlinkFutureException(
+                                                               new 
NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')));
+
+                                               CheckpointCoordinator coord = 
graph.getCheckpointCoordinator();
+                                               if (coord == null) {
+                                                       throw new 
FlinkFutureException(new Exception("Cannot find CheckpointCoordinator for 
job."));
+                                               }
+
+                                               String targetDirectory = 
pathParams.get("targetDirectory");
+                                               if (targetDirectory == null) {
+                                                       if 
(defaultSavepointDirectory == null) {
+                                                               throw new 
IllegalStateException("No savepoint directory configured. " +
+                                                                       "You 
can either specify a directory when triggering this savepoint or " +
+                                                                       
"configure a cluster-wide default via key '" +
+                                                                       
CoreOptions.SAVEPOINT_DIRECTORY.key() + "'.");
+                                                       } else {
+                                                               targetDirectory 
= defaultSavepointDirectory;
+                                                       }
+                                               }
+
+                                               try {
+                                                       return 
handleNewRequest(jobManagerGateway, jobId, targetDirectory, 
coord.getCheckpointTimeout());
+                                               } catch (IOException e) {
+                                                       throw new 
FlinkFutureException("Could not cancel job with savepoint.", e);
+                                               }
+                                       }, executor);
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               private FullHttpResponse handleNewRequest(JobManagerGateway 
jobManagerGateway, final JobID jobId, String targetDirectory, long 
checkpointTimeout) throws IOException {
+                       // Check whether a request exists
+                       final long requestId;
+                       final boolean isNewRequest;
+                       synchronized (lock) {
+                               if (inProgress.containsKey(jobId)) {
+                                       requestId = inProgress.get(jobId);
+                                       isNewRequest = false;
+                               } else {
+                                       requestId = ++requestCounter;
+                                       inProgress.put(jobId, requestId);
+                                       isNewRequest = true;
+                               }
+                       }
+
+                       if (isNewRequest) {
+                               boolean success = false;
+
+                               try {
+                                       // Trigger cancellation
+                                       CompletableFuture<String> 
cancelJobFuture = jobManagerGateway
+                                               .cancelJobWithSavepoint(jobId, 
targetDirectory, Time.milliseconds(checkpointTimeout));
+
+                                       cancelJobFuture.whenCompleteAsync(
+                                               (String path, Throwable 
throwable) -> {
+                                                       try {
+                                                               if (throwable 
!= null) {
+                                                                       
completed.put(requestId, throwable);
+                                                               } else {
+                                                                       
completed.put(requestId, path);
+                                                               }
+                                                       } finally {
+                                                               
inProgress.remove(jobId);
+                                                       }
+                                               }, executor);
+
+                                       success = true;
+                               } finally {
+                                       synchronized (lock) {
+                                               if (!success) {
+                                                       
inProgress.remove(jobId);
+                                               }
+                                       }
+                               }
+                       }
+
+                       // In-progress location
+                       String location = CANCELLATION_IN_PROGRESS_REST_PATH
+                                       .replace(":jobid", jobId.toString())
+                                       .replace(":requestId", 
Long.toString(requestId));
+
+                       // Accepted response
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+                       gen.writeStartObject();
+                       gen.writeStringField("status", "accepted");
+                       gen.writeNumberField("request-id", requestId);
+                       gen.writeStringField("location", location);
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       HttpResponseStatus.ACCEPTED,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.LOCATION, 
location);
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       FullHttpResponse accepted = response;
+
+                       return accepted;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // In-progress requests
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Handler for in-progress cancel with savepoint operations.
+        */
+       class InProgressHandler implements RequestHandler {
+
+               /** The number of recent checkpoints whose IDs are remembered. 
*/
+               private static final int NUM_GHOST_REQUEST_IDS = 16;
+
+               /** Remember some recently completed. */
+               private final ArrayDeque<Tuple2<Long, Object>> 
recentlyCompleted = new ArrayDeque<>(NUM_GHOST_REQUEST_IDS);
+
+               @Override
+               public String[] getPaths() {
+                       return new String[]{CANCELLATION_IN_PROGRESS_REST_PATH};
+               }
+
+               @Override
+               @SuppressWarnings("unchecked")
+               public CompletableFuture<FullHttpResponse> 
handleRequest(Map<String, String> pathParams, Map<String, String> queryParams, 
JobManagerGateway jobManagerGateway) {
+                       JobID jobId = 
JobID.fromHexString(pathParams.get("jobid"));
+                       long requestId = 
Long.parseLong(pathParams.get("requestId"));
+
+                       return CompletableFuture.supplyAsync(
+                               () -> {
+                                       try {
+                                               synchronized (lock) {
+                                                       Object result = 
completed.remove(requestId);
+
+                                                       if (result != null) {
+                                                               // Add to 
recent history
+                                                               
recentlyCompleted.add(new Tuple2<>(requestId, result));
+                                                               if 
(recentlyCompleted.size() > NUM_GHOST_REQUEST_IDS) {
+                                                                       
recentlyCompleted.remove();
+                                                               }
+
+                                                               if 
(result.getClass() == String.class) {
+                                                                       String 
savepointPath = (String) result;
+                                                                       return 
createSuccessResponse(requestId, savepointPath);
+                                                               } else {
+                                                                       
Throwable cause = (Throwable) result;
+                                                                       return 
createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, requestId, 
cause.getMessage());
+                                                               }
+                                                       } else {
+                                                               // Check 
in-progress
+                                                               Long 
inProgressRequestId = inProgress.get(jobId);
+                                                               if 
(inProgressRequestId != null) {
+                                                                       // 
Sanity check
+                                                                       if 
(inProgressRequestId == requestId) {
+                                                                               
return createInProgressResponse(requestId);
+                                                                       } else {
+                                                                               
String msg = "Request ID does not belong to JobID";
+                                                                               
return createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, msg);
+                                                                       }
+                                                               }
+
+                                                               // Check recent 
history
+                                                               for 
(Tuple2<Long, Object> recent : recentlyCompleted) {
+                                                                       if 
(recent.f0 == requestId) {
+                                                                               
if (recent.f1.getClass() == String.class) {
+                                                                               
        String savepointPath = (String) recent.f1;
+                                                                               
        return createSuccessResponse(requestId, savepointPath);
+                                                                               
} else {
+                                                                               
        Throwable cause = (Throwable) recent.f1;
+                                                                               
        return createFailureResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, 
requestId, cause.getMessage());
+                                                                               
}
+                                                                       }
+                                                               }
+
+                                                               return 
createFailureResponse(HttpResponseStatus.BAD_REQUEST, requestId, "Unknown 
job/request ID");
+                                                       }
+                                               }
+                                       } catch (Exception e) {
+                                               throw new 
FlinkFutureException("Could not handle in progress request.", e);
+                                       }
+                               });
+               }
+
+               private FullHttpResponse createSuccessResponse(long requestId, 
String savepointPath) throws IOException {
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+                       gen.writeStartObject();
+
+                       gen.writeStringField("status", "success");
+                       gen.writeNumberField("request-id", requestId);
+                       gen.writeStringField("savepoint-path", savepointPath);
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       HttpResponseStatus.CREATED,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       return response;
+               }
+
+               private FullHttpResponse createInProgressResponse(long 
requestId) throws IOException {
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+                       gen.writeStartObject();
+
+                       gen.writeStringField("status", "in-progress");
+                       gen.writeNumberField("request-id", requestId);
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       HttpResponseStatus.ACCEPTED,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       return response;
+               }
+
+               private FullHttpResponse 
createFailureResponse(HttpResponseStatus code, long requestId, String errMsg) 
throws IOException {
+                       StringWriter writer = new StringWriter();
+                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+                       gen.writeStartObject();
+
+                       gen.writeStringField("status", "failed");
+                       gen.writeNumberField("request-id", requestId);
+                       gen.writeStringField("cause", errMsg);
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       String json = writer.toString();
+                       byte[] bytes = json.getBytes(ENCODING);
+
+                       DefaultFullHttpResponse response = new 
DefaultFullHttpResponse(
+                                       HttpVersion.HTTP_1_1,
+                                       code,
+                                       Unpooled.wrappedBuffer(bytes));
+
+                       response.headers().set(HttpHeaders.Names.CONTENT_TYPE, 
"application/json; charset=" + ENCODING.name());
+                       
response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 
response.content().readableBytes());
+
+                       return response;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
new file mode 100644
index 0000000..bb1cf8f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the execution config of a job.
+ */
+public class JobConfigHandler extends AbstractExecutionGraphRequestHandler {
+
+       private static final String JOB_CONFIG_REST_PATH = 
"/jobs/:jobid/config";
+
+       public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_CONFIG_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobConfigJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write job config json.", e);
+                               }
+                       },
+                       executor);
+
+       }
+
+       /**
+        * Archivist for the JobConfigHandler.
+        */
+       public static class JobConfigJsonArchivist implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobConfigJson(graph);
+                       String path = JOB_CONFIG_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
+       public static String createJobConfigJson(AccessExecutionGraph graph) 
throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+               gen.writeStringField("jid", graph.getJobID().toString());
+               gen.writeStringField("name", graph.getJobName());
+
+               final ArchivedExecutionConfig summary = 
graph.getArchivedExecutionConfig();
+
+               if (summary != null) {
+                       gen.writeObjectFieldStart("execution-config");
+
+                       gen.writeStringField("execution-mode", 
summary.getExecutionMode());
+
+                       gen.writeStringField("restart-strategy", 
summary.getRestartStrategyDescription());
+                       gen.writeNumberField("job-parallelism", 
summary.getParallelism());
+                       gen.writeBooleanField("object-reuse-mode", 
summary.getObjectReuseEnabled());
+
+                       Map<String, String> ucVals = 
summary.getGlobalJobParameters();
+                       if (ucVals != null) {
+                               gen.writeObjectFieldStart("user-config");
+
+                               for (Map.Entry<String, String> ucVal : 
ucVals.entrySet()) {
+                                       gen.writeStringField(ucVal.getKey(), 
ucVal.getValue());
+                               }
+
+                               gen.writeEndObject();
+                       }
+
+                       gen.writeEndObject();
+               }
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
new file mode 100644
index 0000000..dd6aee8
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns details about a job. This includes:
+ * <ul>
+ *     <li>Dataflow plan</li>
+ *     <li>id, name, and current status</li>
+ *     <li>start time, end time, duration</li>
+ *     <li>number of job vertices in each state (pending, running, finished, 
failed)</li>
+ *     <li>info about job vertices, including runtime, status, I/O bytes and 
records, subtasks in each status</li>
+ * </ul>
+ */
+public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler {
+
+       private static final String JOB_DETAILS_REST_PATH = "/jobs/:jobid";
+       private static final String JOB_DETAILS_VERTICES_REST_PATH = 
"/jobs/:jobid/vertices";
+
+       private final MetricFetcher fetcher;
+
+       public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
+               this.fetcher = fetcher;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_DETAILS_REST_PATH, 
JOB_DETAILS_VERTICES_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobDetailsJson(graph, 
fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job details json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the JobDetailsHandler.
+        */
+       public static class JobDetailsJsonArchivist implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobDetailsJson(graph, null);
+                       String path1 = JOB_DETAILS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       String path2 = JOB_DETAILS_VERTICES_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       Collection<ArchivedJson> archives = new ArrayList<>();
+                       archives.add(new ArchivedJson(path1, json));
+                       archives.add(new ArchivedJson(path2, json));
+                       return archives;
+               }
+       }
+
+       public static String createJobDetailsJson(AccessExecutionGraph graph, 
@Nullable MetricFetcher fetcher) throws IOException {
+               final StringWriter writer = new StringWriter();
+               final JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               final long now = System.currentTimeMillis();
+
+               gen.writeStartObject();
+
+               // basic info
+               gen.writeStringField("jid", graph.getJobID().toString());
+               gen.writeStringField("name", graph.getJobName());
+               gen.writeBooleanField("isStoppable", graph.isStoppable());
+               gen.writeStringField("state", graph.getState().name());
+
+               // times and duration
+               final long jobStartTime = 
graph.getStatusTimestamp(JobStatus.CREATED);
+               final long jobEndTime = 
graph.getState().isGloballyTerminalState() ?
+                               graph.getStatusTimestamp(graph.getState()) : 
-1L;
+               gen.writeNumberField("start-time", jobStartTime);
+               gen.writeNumberField("end-time", jobEndTime);
+               gen.writeNumberField("duration", (jobEndTime > 0 ? jobEndTime : 
now) - jobStartTime);
+               gen.writeNumberField("now", now);
+
+               // timestamps
+               gen.writeObjectFieldStart("timestamps");
+               for (JobStatus status : JobStatus.values()) {
+                       gen.writeNumberField(status.name(), 
graph.getStatusTimestamp(status));
+               }
+               gen.writeEndObject();
+
+               // job vertices
+               int[] jobVerticesPerState = new 
int[ExecutionState.values().length];
+               gen.writeArrayFieldStart("vertices");
+
+               for (AccessExecutionJobVertex ejv : 
graph.getVerticesTopologically()) {
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+                       long startTime = Long.MAX_VALUE;
+                       long endTime = 0;
+                       boolean allFinished = true;
+
+                       for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
+                               final ExecutionState state = 
vertex.getExecutionState();
+                               tasksPerState[state.ordinal()]++;
+
+                               // take the earliest start time
+                               long started = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+                               if (started > 0) {
+                                       startTime = Math.min(startTime, 
started);
+                               }
+
+                               allFinished &= state.isTerminal();
+                               endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
+                       }
+
+                       long duration;
+                       if (startTime < Long.MAX_VALUE) {
+                               if (allFinished) {
+                                       duration = endTime - startTime;
+                               }
+                               else {
+                                       endTime = -1L;
+                                       duration = now - startTime;
+                               }
+                       }
+                       else {
+                               startTime = -1L;
+                               endTime = -1L;
+                               duration = -1L;
+                       }
+
+                       ExecutionState jobVertexState =
+                                       
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
ejv.getParallelism());
+                       jobVerticesPerState[jobVertexState.ordinal()]++;
+
+                       gen.writeStartObject();
+                       gen.writeStringField("id", 
ejv.getJobVertexId().toString());
+                       gen.writeStringField("name", ejv.getName());
+                       gen.writeNumberField("parallelism", 
ejv.getParallelism());
+                       gen.writeStringField("status", jobVertexState.name());
+
+                       gen.writeNumberField("start-time", startTime);
+                       gen.writeNumberField("end-time", endTime);
+                       gen.writeNumberField("duration", duration);
+
+                       gen.writeObjectFieldStart("tasks");
+                       for (ExecutionState state : ExecutionState.values()) {
+                               gen.writeNumberField(state.name(), 
tasksPerState[state.ordinal()]);
+                       }
+                       gen.writeEndObject();
+
+                       MutableIOMetrics counts = new MutableIOMetrics();
+
+                       for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
+                               counts.addIOMetrics(
+                                       vertex.getCurrentExecutionAttempt(),
+                                       fetcher,
+                                       graph.getJobID().toString(),
+                                       ejv.getJobVertexId().toString());
+                       }
+
+                       counts.writeIOMetricsAsJson(gen);
+
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.writeObjectFieldStart("status-counts");
+               for (ExecutionState state : ExecutionState.values()) {
+                       gen.writeNumberField(state.name(), 
jobVerticesPerState[state.ordinal()]);
+               }
+               gen.writeEndObject();
+
+               gen.writeFieldName("plan");
+               gen.writeRawValue(graph.getJsonPlan());
+
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

Reply via email to