http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
new file mode 100644
index 0000000..3236062
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the configuration of a job.
+ */
+public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler 
{
+
+       private static final String JOB_EXCEPTIONS_REST_PATH = 
"/jobs/:jobid/exceptions";
+
+       static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+
+       public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_EXCEPTIONS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createJobExceptionsJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job exceptions json.", e);
+                               }
+                       },
+                       executor
+               );
+       }
+
+       /**
+        * Archivist for the JobExceptionsHandler.
+        */
+       public static class JobExceptionsJsonArchivist implements JsonArchivist 
{
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobExceptionsJson(graph);
+                       String path = JOB_EXCEPTIONS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
+       public static String createJobExceptionsJson(AccessExecutionGraph 
graph) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+
+               // most important is the root failure cause
+               ErrorInfo rootException = graph.getFailureCause();
+               if (rootException != null && 
!rootException.getExceptionAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
+                       gen.writeStringField("root-exception", 
rootException.getExceptionAsString());
+                       gen.writeNumberField("timestamp", 
rootException.getTimestamp());
+               }
+
+               // we additionally collect all exceptions (up to a limit) that 
occurred in the individual tasks
+               gen.writeArrayFieldStart("all-exceptions");
+
+               int numExceptionsSoFar = 0;
+               boolean truncated = false;
+
+               for (AccessExecutionVertex task : 
graph.getAllExecutionVertices()) {
+                       String t = task.getFailureCauseAsString();
+                       if (t != null && 
!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+                               if (numExceptionsSoFar >= 
MAX_NUMBER_EXCEPTION_TO_REPORT) {
+                                       truncated = true;
+                                       break;
+                               }
+
+                               TaskManagerLocation location = 
task.getCurrentAssignedResourceLocation();
+                               String locationString = location != null ?
+                                               location.getFQDNHostname() + 
':' + location.dataPort() : "(unassigned)";
+
+                               gen.writeStartObject();
+                               gen.writeStringField("exception", t);
+                               gen.writeStringField("task", 
task.getTaskNameWithSubtaskIndex());
+                               gen.writeStringField("location", 
locationString);
+                               long timestamp = 
task.getStateTimestamp(ExecutionState.FAILED);
+                               gen.writeNumberField("timestamp", timestamp == 
0 ? -1 : timestamp);
+                               gen.writeEndObject();
+                               numExceptionsSoFar++;
+                       }
+               }
+               gen.writeEndArray();
+
+               gen.writeBooleanField("truncated", truncated);
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
new file mode 100644
index 0000000..364af91
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobManagerConfigHandler.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Returns the Job Manager's configuration.
+ */
+public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
+
+       private static final String JOBMANAGER_CONFIG_REST_PATH = 
"/jobmanager/config";
+
+       private final Configuration config;
+
+       public JobManagerConfigHandler(Executor executor, Configuration config) 
{
+               super(executor);
+               this.config = config;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOBMANAGER_CONFIG_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       StringWriter writer = new 
StringWriter();
+                                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+                                       gen.writeStartArray();
+                                       for (String key : config.keySet()) {
+                                               gen.writeStartObject();
+                                               gen.writeStringField("key", 
key);
+
+                                               // Mask key values which 
contain sensitive information
+                                               if 
(key.toLowerCase().contains("password")) {
+                                                       String value = 
config.getString(key, null);
+                                                       if (value != null) {
+                                                               value = 
"******";
+                                                       }
+                                                       
gen.writeStringField("value", value);
+                                               } else {
+                                                       
gen.writeStringField("value", config.getString(key, null));
+                                               }
+                                               gen.writeEndObject();
+                                       }
+                                       gen.writeEndArray();
+
+                                       gen.close();
+                                       return writer.toString();
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write configuration.", e);
+                               }
+                       },
+                       executor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
new file mode 100644
index 0000000..d9db1ff
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the JSON program plan of a job graph.
+ */
+public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
+
+       private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
+
+       public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_PLAN_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.completedFuture(graph.getJsonPlan());
+       }
+
+       /**
+        * Archivist for the JobPlanHandler.
+        */
+       public static class JobPlanJsonArchivist implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String path = JOB_PLAN_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       String json = graph.getJsonPlan();
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
new file mode 100644
index 0000000..cc41a1c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobStoppingHandler.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the STOP request.
+ */
+public class JobStoppingHandler extends AbstractJsonRequestHandler {
+
+       private static final String JOB_STOPPING_REST_PATH = 
"/jobs/:jobid/stop";
+       private static final String JOB_STOPPING_YARN_REST_PATH = 
"/jobs/:jobid/yarn-stop";
+
+       private final Time timeout;
+
+       public JobStoppingHandler(Executor executor, Time timeout) {
+               super(executor);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_STOPPING_REST_PATH, 
JOB_STOPPING_YARN_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
+                                       if (jobManagerGateway != null) {
+                                               
jobManagerGateway.stopJob(jobId, timeout);
+                                               return "{}";
+                                       }
+                                       else {
+                                               throw new Exception("No 
connection to the leading JobManager.");
+                                       }
+                               }
+                               catch (Exception e) {
+                                       throw new FlinkFutureException("Failed 
to stop the job with id: "  + pathParams.get("jobid") + '.', e);
+                               }
+                       },
+                       executor);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
new file mode 100644
index 0000000..9830ab4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the accummulators for a given vertex.
+ */
+public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
+
+       private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/accumulators";
+
+       public JobVertexAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createVertexAccumulatorsJson(jobVertex);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create job vertex accumulators json.", e);
+                               }
+                       },
+                       executor);
+
+       }
+
+       /**
+        * Archivist for JobVertexAccumulatorsHandler.
+        */
+       public static class JobVertexAccumulatorsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = 
createVertexAccumulatorsJson(task);
+                               String path = JOB_VERTEX_ACCUMULATORS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
+       public static String 
createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
+
+               gen.writeStartObject();
+               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
+
+               gen.writeArrayFieldStart("user-accumulators");
+               for (StringifiedAccumulatorResult acc : accs) {
+                       gen.writeStartObject();
+                       gen.writeStringField("name", acc.getName());
+                       gen.writeStringField("type", acc.getType());
+                       gen.writeStringField("value", acc.getValue());
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
new file mode 100644
index 0000000..59bfc0b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.BackPressureStatsTracker;
+import 
org.apache.flink.runtime.rest.handler.legacy.backpressure.OperatorBackPressureStats;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns back pressure stats for a single job vertex and
+ * all its sub tasks.
+ */
+public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandler {
+
+       private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
+
+       /** Back pressure stats tracker. */
+       private final BackPressureStatsTracker backPressureStatsTracker;
+
+       /** Time after which stats are considered outdated. */
+       private final int refreshInterval;
+
+       public JobVertexBackPressureHandler(
+                       ExecutionGraphHolder executionGraphHolder,
+                       Executor executor,
+                       BackPressureStatsTracker backPressureStatsTracker,
+                       int refreshInterval) {
+
+               super(executionGraphHolder, executor);
+               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker, "Stats tracker");
+               checkArgument(refreshInterval >= 0, "Negative timeout");
+               this.refreshInterval = refreshInterval;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(
+                       AccessExecutionJobVertex accessJobVertex,
+                       Map<String, String> params) {
+               if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
+                       return CompletableFuture.completedFuture("");
+               }
+               ExecutionJobVertex jobVertex = (ExecutionJobVertex) 
accessJobVertex;
+               try (StringWriter writer = new StringWriter();
+                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
+
+                       gen.writeStartObject();
+
+                       Optional<OperatorBackPressureStats> statsOption = 
backPressureStatsTracker
+                                       
.getOperatorBackPressureStats(jobVertex);
+
+                       if (statsOption.isPresent()) {
+                               OperatorBackPressureStats stats = 
statsOption.get();
+
+                               // Check whether we need to refresh
+                               if (refreshInterval <= 
System.currentTimeMillis() - stats.getEndTimestamp()) {
+                                       
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+                                       gen.writeStringField("status", 
"deprecated");
+                               } else {
+                                       gen.writeStringField("status", "ok");
+                               }
+
+                               gen.writeStringField("backpressure-level", 
getBackPressureLevel(stats.getMaxBackPressureRatio()));
+                               gen.writeNumberField("end-timestamp", 
stats.getEndTimestamp());
+
+                               // Sub tasks
+                               gen.writeArrayFieldStart("subtasks");
+                               int numSubTasks = stats.getNumberOfSubTasks();
+                               for (int i = 0; i < numSubTasks; i++) {
+                                       double ratio = 
stats.getBackPressureRatio(i);
+
+                                       gen.writeStartObject();
+                                       gen.writeNumberField("subtask", i);
+                                       
gen.writeStringField("backpressure-level", getBackPressureLevel(ratio));
+                                       gen.writeNumberField("ratio", ratio);
+                                       gen.writeEndObject();
+                               }
+                               gen.writeEndArray();
+                       } else {
+                               
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
+                               gen.writeStringField("status", "deprecated");
+                       }
+
+                       gen.writeEndObject();
+                       gen.close();
+
+                       return 
CompletableFuture.completedFuture(writer.toString());
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
+       }
+
+       /**
+        * Returns the back pressure level as a String.
+        *
+        * @param backPressureRatio Ratio of back pressures samples to total 
number of samples.
+        *
+        * @return Back pressure level ('no', 'low', or 'high')
+        */
+       static String getBackPressureLevel(double backPressureRatio) {
+               if (backPressureRatio <= 0.10) {
+                       return "ok";
+               } else if (backPressureRatio <= 0.5) {
+                       return "low";
+               } else {
+                       return "high";
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
new file mode 100644
index 0000000..3f0c77c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * A request handler that provides the details of a job vertex, including id, 
name, parallelism,
+ * and the runtime and metrics of all its subtasks.
+ */
+public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
+
+       private static final String JOB_VERTEX_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid";
+
+       private final MetricFetcher fetcher;
+
+       public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
+               this.fetcher = fetcher;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_DETAILS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write the vertex details json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the JobVertexDetailsHandler.
+        */
+       public static class JobVertexDetailsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = createVertexDetailsJson(task, 
graph.getJobID().toString(), null);
+                               String path = JOB_VERTEX_DETAILS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
+       public static String createVertexDetailsJson(
+                       AccessExecutionJobVertex jobVertex,
+                       String jobID,
+                       @Nullable MetricFetcher fetcher) throws IOException {
+               final long now = System.currentTimeMillis();
+
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+
+               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
+               gen.writeStringField("name", jobVertex.getName());
+               gen.writeNumberField("parallelism", jobVertex.getParallelism());
+               gen.writeNumberField("now", now);
+
+               gen.writeArrayFieldStart("subtasks");
+               int num = 0;
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
+                       final ExecutionState status = 
vertex.getExecutionState();
+
+                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
+                       String locationString = location == null ? 
"(unassigned)" : location.getHostname() + ":" + location.dataPort();
+
+                       long startTime = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+                       if (startTime == 0) {
+                               startTime = -1;
+                       }
+                       long endTime = status.isTerminal() ? 
vertex.getStateTimestamp(status) : -1;
+                       long duration = startTime > 0 ? ((endTime > 0 ? endTime 
: now) - startTime) : -1;
+
+                       gen.writeStartObject();
+                       gen.writeNumberField("subtask", num);
+                       gen.writeStringField("status", status.name());
+                       gen.writeNumberField("attempt", 
vertex.getCurrentExecutionAttempt().getAttemptNumber());
+                       gen.writeStringField("host", locationString);
+                       gen.writeNumberField("start-time", startTime);
+                       gen.writeNumberField("end-time", endTime);
+                       gen.writeNumberField("duration", duration);
+
+                       MutableIOMetrics counts = new MutableIOMetrics();
+
+                       counts.addIOMetrics(
+                               vertex.getCurrentExecutionAttempt(),
+                               fetcher,
+                               jobID,
+                               jobVertex.getJobVertexId().toString()
+                       );
+
+                       counts.writeIOMetricsAsJson(gen);
+
+                       gen.writeEndObject();
+
+                       num++;
+               }
+               gen.writeEndArray();
+
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
new file mode 100644
index 0000000..fa4ab67
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * A request handler that provides the details of a job vertex, including id, 
name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
+ */
+public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandler {
+
+       private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/taskmanagers";
+
+       private final MetricFetcher fetcher;
+
+       public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
+               this.fetcher = fetcher;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create TaskManager json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for JobVertexTaskManagersHandler.
+        */
+       public static class JobVertexTaskManagersJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = 
createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null);
+                               String path = JOB_VERTEX_TASKMANAGERS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
+       public static String createVertexDetailsByTaskManagerJson(
+                       AccessExecutionJobVertex jobVertex,
+                       String jobID,
+                       @Nullable MetricFetcher fetcher) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               // Build a map that groups tasks by TaskManager
+               Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
+
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
+                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
+                       String taskManager = location == null ? "(unassigned)" 
: location.getHostname() + ":" + location.dataPort();
+
+                       List<AccessExecutionVertex> vertices = 
taskManagerVertices.get(taskManager);
+
+                       if (vertices == null) {
+                               vertices = new ArrayList<>();
+                               taskManagerVertices.put(taskManager, vertices);
+                       }
+
+                       vertices.add(vertex);
+               }
+
+               // Build JSON response
+               final long now = System.currentTimeMillis();
+
+               gen.writeStartObject();
+
+               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
+               gen.writeStringField("name", jobVertex.getName());
+               gen.writeNumberField("now", now);
+
+               gen.writeArrayFieldStart("taskmanagers");
+               for (Map.Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
+                       String host = entry.getKey();
+                       List<AccessExecutionVertex> taskVertices = 
entry.getValue();
+
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+
+                       long startTime = Long.MAX_VALUE;
+                       long endTime = 0;
+                       boolean allFinished = true;
+
+                       MutableIOMetrics counts = new MutableIOMetrics();
+
+                       for (AccessExecutionVertex vertex : taskVertices) {
+                               final ExecutionState state = 
vertex.getExecutionState();
+                               tasksPerState[state.ordinal()]++;
+
+                               // take the earliest start time
+                               long started = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+                               if (started > 0) {
+                                       startTime = Math.min(startTime, 
started);
+                               }
+
+                               allFinished &= state.isTerminal();
+                               endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
+
+                               counts.addIOMetrics(
+                                       vertex.getCurrentExecutionAttempt(),
+                                       fetcher,
+                                       jobID,
+                                       jobVertex.getJobVertexId().toString());
+                       }
+
+                       long duration;
+                       if (startTime < Long.MAX_VALUE) {
+                               if (allFinished) {
+                                       duration = endTime - startTime;
+                               }
+                               else {
+                                       endTime = -1L;
+                                       duration = now - startTime;
+                               }
+                       }
+                       else {
+                               startTime = -1L;
+                               endTime = -1L;
+                               duration = -1L;
+                       }
+
+                       ExecutionState jobVertexState =
+                               
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
taskVertices.size());
+
+                       gen.writeStartObject();
+
+                       gen.writeStringField("host", host);
+                       gen.writeStringField("status", jobVertexState.name());
+
+                       gen.writeNumberField("start-time", startTime);
+                       gen.writeNumberField("end-time", endTime);
+                       gen.writeNumberField("duration", duration);
+
+                       counts.writeIOMetricsAsJson(gen);
+
+                       gen.writeObjectFieldStart("status-counts");
+                       for (ExecutionState state : ExecutionState.values()) {
+                               gen.writeNumberField(state.name(), 
tasksPerState[state.ordinal()]);
+                       }
+                       gen.writeEndObject();
+
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java
new file mode 100644
index 0000000..e0629f3
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JsonFactory.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+/**
+ * A holder for the singleton Jackson JSON factory. Since the Jackson's JSON 
factory object
+ * is a heavyweight object that is encouraged to be shared, we use a singleton 
instance across
+ * all request handlers.
+ */
+public class JsonFactory {
+
+       /** The singleton Jackson JSON factory. */
+       public static final com.fasterxml.jackson.core.JsonFactory 
JACKSON_FACTORY =
+                       new com.fasterxml.jackson.core.JsonFactory();
+
+       // 
--------------------------------------------------------------------------------------------
+
+       private JsonFactory() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java
new file mode 100644
index 0000000..d9fc899
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandler.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * Base interface for all request handlers.
+ *
+ * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler}
+ * as a starting point, which produces a valid HTTP response.
+ */
+public interface RequestHandler {
+
+       /**
+        * Core method that handles the request and generates the response. The 
method needs to
+        * respond with a full http response, including content-type, 
content-length, etc.
+        *
+        * <p>Exceptions may be throws and will be handled.
+        *
+        * @param pathParams The map of REST path parameters, decoded by the 
router.
+        * @param queryParams The map of query parameters.
+        * @param jobManagerGateway to talk to the JobManager.
+        *
+        * @return The full http response.
+        */
+       CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway);
+
+       /**
+        * Returns an array of REST URL's under which this handler can be 
registered.
+        *
+        * @return array containing REST URL's under which this handler can be 
registered.
+        */
+       String[] getPaths();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java
new file mode 100644
index 0000000..0dad905
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/RequestHandlerException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+/**
+ * Base class for request handler exceptions.
+ */
+public class RequestHandlerException extends Exception {
+
+       private static final long serialVersionUID = 7570352908725875886L;
+
+       public RequestHandlerException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
new file mode 100644
index 0000000..ff4fb46
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttemptDetailsHandler {
+
+       public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
+
+       public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor, fetcher);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionVertex 
vertex, Map<String, String> params) {
+               return handleRequest(vertex.getCurrentExecutionAttempt(), 
params);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
new file mode 100644
index 0000000..be4fe0b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Base class for request handlers whose response depends on a specific job 
vertex (defined
+ * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).
+ */
+public class SubtaskExecutionAttemptAccumulatorsHandler extends 
AbstractSubtaskAttemptRequestHandler {
+
+       private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
+
+       public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createAttemptAccumulatorsJson(execAttempt);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create accumulator json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler.
+        */
+       public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist 
implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
+                                       String curAttemptJson = 
createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt());
+                                       String curAttemptPath = 
SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":vertexid", 
task.getJobVertexId().toString())
+                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                               .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+                                       archive.add(new 
ArchivedJson(curAttemptPath, curAttemptJson));
+
+                                       for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+                                               AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
+                                               String json = 
createAttemptAccumulatorsJson(attempt);
+                                               String path = 
SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+                                                       .replace(":jobid", 
graph.getJobID().toString())
+                                                       .replace(":vertexid", 
task.getJobVertexId().toString())
+                                                       .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(":attempt", 
String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
+                               }
+                       }
+                       return archive;
+               }
+       }
+
+       public static String createAttemptAccumulatorsJson(AccessExecution 
execAttempt) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
+
+               gen.writeStartObject();
+
+               gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
+               gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
+               gen.writeStringField("id", 
execAttempt.getAttemptId().toString());
+
+               gen.writeArrayFieldStart("user-accumulators");
+               for (StringifiedAccumulatorResult acc : accs) {
+                       gen.writeStartObject();
+                       gen.writeStringField("name", acc.getName());
+                       gen.writeStringField("type", acc.getType());
+                       gen.writeStringField("value", acc.getValue());
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
new file mode 100644
index 0000000..83a8793
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.runtime.rest.handler.legacy.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
+
+/**
+ * Request handler providing details about a single task execution attempt.
+ */
+public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemptRequestHandler {
+
+       private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt";
+
+       private final MetricFetcher fetcher;
+
+       public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
+               super(executionGraphHolder, executor);
+               this.fetcher = fetcher;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createAttemptDetailsJson(execAttempt, params.get("jobid"), 
params.get("vertexid"), fetcher);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create attempt details json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the SubtaskExecutionAttemptDetailsHandler.
+        */
+       public static class SubtaskExecutionAttemptDetailsJsonArchivist 
implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
+                                       String curAttemptJson = 
createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), 
graph.getJobID().toString(), task.getJobVertexId().toString(), null);
+                                       String curAttemptPath1 = 
SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":vertexid", 
task.getJobVertexId().toString())
+                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()));
+                                       String curAttemptPath2 = 
SUBTASK_ATTEMPT_DETAILS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":vertexid", 
task.getJobVertexId().toString())
+                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                               .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+                                       archive.add(new 
ArchivedJson(curAttemptPath1, curAttemptJson));
+                                       archive.add(new 
ArchivedJson(curAttemptPath2, curAttemptJson));
+
+                                       for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+                                               AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
+                                               String json = 
createAttemptDetailsJson(attempt, graph.getJobID().toString(), 
task.getJobVertexId().toString(), null);
+                                               String path = 
SUBTASK_ATTEMPT_DETAILS_REST_PATH
+                                                       .replace(":jobid", 
graph.getJobID().toString())
+                                                       .replace(":vertexid", 
task.getJobVertexId().toString())
+                                                       .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(":attempt", 
String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
+                               }
+                       }
+                       return archive;
+               }
+       }
+
+       public static String createAttemptDetailsJson(
+                       AccessExecution execAttempt,
+                       String jobID,
+                       String vertexID,
+                       @Nullable MetricFetcher fetcher) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               final ExecutionState status = execAttempt.getState();
+               final long now = System.currentTimeMillis();
+
+               TaskManagerLocation location = 
execAttempt.getAssignedResourceLocation();
+               String locationString = location == null ? "(unassigned)" : 
location.getHostname();
+
+               long startTime = 
execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+               if (startTime == 0) {
+                       startTime = -1;
+               }
+               long endTime = status.isTerminal() ? 
execAttempt.getStateTimestamp(status) : -1;
+               long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) 
- startTime) : -1;
+
+               gen.writeStartObject();
+               gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
+               gen.writeStringField("status", status.name());
+               gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
+               gen.writeStringField("host", locationString);
+               gen.writeNumberField("start-time", startTime);
+               gen.writeNumberField("end-time", endTime);
+               gen.writeNumberField("duration", duration);
+
+               MutableIOMetrics counts = new MutableIOMetrics();
+
+               counts.addIOMetrics(
+                       execAttempt,
+                       fetcher,
+                       jobID,
+                       vertexID
+               );
+
+               counts.writeIOMetricsAsJson(gen);
+
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
new file mode 100644
index 0000000..6d0757d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the accumulators for all subtasks of job 
vertex.
+ */
+public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
+
+       private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH =       
"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
+
+       public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createSubtasksAccumulatorsJson(jobVertex);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create subtasks accumulator json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the SubtasksAllAccumulatorsHandler.
+        */
+       public static class SubtasksAllAccumulatorsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = 
createSubtasksAccumulatorsJson(task);
+                               String path = 
SUBTASKS_ALL_ACCUMULATORS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
+       public static String 
createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
+               gen.writeNumberField("parallelism", jobVertex.getParallelism());
+
+               gen.writeArrayFieldStart("subtasks");
+
+               int num = 0;
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
+
+                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
+                       String locationString = location == null ? 
"(unassigned)" : location.getHostname();
+
+                       gen.writeStartObject();
+
+                       gen.writeNumberField("subtask", num++);
+                       gen.writeNumberField("attempt", 
vertex.getCurrentExecutionAttempt().getAttemptNumber());
+                       gen.writeStringField("host", locationString);
+
+                       StringifiedAccumulatorResult[] accs = 
vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
+                       gen.writeArrayFieldStart("user-accumulators");
+                       for (StringifiedAccumulatorResult acc : accs) {
+                               gen.writeStartObject();
+                               gen.writeStringField("name", acc.getName());
+                               gen.writeStringField("type", acc.getType());
+                               gen.writeStringField("value", acc.getValue());
+                               gen.writeEndObject();
+                       }
+                       gen.writeEndArray();
+
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.writeEndObject();
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
new file mode 100644
index 0000000..13fdc16
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns the state transition timestamps for all 
subtasks, plus their
+ * location and duration.
+ */
+public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
+
+       private static final String SUBTASK_TIMES_REST_PATH =   
"/jobs/:jobid/vertices/:vertexid/subtasktimes";
+
+       public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{SUBTASK_TIMES_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createSubtaskTimesJson(jobVertex);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not write subtask time json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the SubtasksTimesHandler.
+        */
+       public static class SubtasksTimesJsonArchivist implements JsonArchivist 
{
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = createSubtaskTimesJson(task);
+                               String path = SUBTASK_TIMES_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
+       public static String createSubtaskTimesJson(AccessExecutionJobVertex 
jobVertex) throws IOException {
+               final long now = System.currentTimeMillis();
+
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+
+               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
+               gen.writeStringField("name", jobVertex.getName());
+               gen.writeNumberField("now", now);
+
+               gen.writeArrayFieldStart("subtasks");
+
+               int num = 0;
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
+
+                       long[] timestamps = 
vertex.getCurrentExecutionAttempt().getStateTimestamps();
+                       ExecutionState status = vertex.getExecutionState();
+
+                       long scheduledTime = 
timestamps[ExecutionState.SCHEDULED.ordinal()];
+
+                       long start = scheduledTime > 0 ? scheduledTime : -1;
+                       long end = status.isTerminal() ? 
timestamps[status.ordinal()] : now;
+                       long duration = start >= 0 ? end - start : -1L;
+
+                       gen.writeStartObject();
+                       gen.writeNumberField("subtask", num++);
+
+                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
+                       String locationString = location == null ? 
"(unassigned)" : location.getHostname();
+                       gen.writeStringField("host", locationString);
+
+                       gen.writeNumberField("duration", duration);
+
+                       gen.writeObjectFieldStart("timestamps");
+                       for (ExecutionState state : ExecutionState.values()) {
+                               gen.writeNumberField(state.name(), 
timestamps[state.ordinal()]);
+                       }
+                       gen.writeEndObject();
+
+                       gen.writeEndObject();
+               }
+
+               gen.writeEndArray();
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
new file mode 100644
index 0000000..718657e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+/*****************************************************************************
+ * This code is based on the "HttpStaticFileServerHandler" from the
+ * Netty project's HTTP server example.
+ *
+ * See http://netty.io and
+ * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
+ *****************************************************************************/
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobCache;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.BlobView;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.RedirectHandler;
+import org.apache.flink.runtime.rest.handler.WebHandler;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.StringUtils;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.net.InetSocketAddress;
+import java.nio.channels.FileChannel;
+import java.util.HashMap;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns the TaskManager log/out files.
+ *
+ * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
+ * example.</p>
+ */
[email protected]
+public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> 
implements WebHandler {
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerLogHandler.class);
+
+       private static final String TASKMANAGER_LOG_REST_PATH = 
"/taskmanagers/:taskmanagerid/log";
+       private static final String TASKMANAGER_OUT_REST_PATH = 
"/taskmanagers/:taskmanagerid/stdout";
+
+       /** Keep track of last transmitted log, to clean up old ones. */
+       private final HashMap<String, BlobKey> lastSubmittedLog = new 
HashMap<>();
+       private final HashMap<String, BlobKey> lastSubmittedStdout = new 
HashMap<>();
+
+       /** Keep track of request status, prevents multiple log requests for a 
single TM running concurrently. */
+       private final ConcurrentHashMap<String, Boolean> lastRequestPending = 
new ConcurrentHashMap<>();
+       private final Configuration config;
+
+       /** Future of the blob cache. */
+       private CompletableFuture<BlobCache> cache;
+
+       /** Indicates which log file should be displayed. */
+       private FileMode fileMode;
+
+       private final Executor executor;
+
+       private final BlobView blobView;
+
+       /** Used to control whether this handler serves the .log or .out file. 
*/
+       public enum FileMode {
+               LOG,
+               STDOUT
+       }
+
+       public TaskManagerLogHandler(
+               GatewayRetriever<JobManagerGateway> retriever,
+               Executor executor,
+               CompletableFuture<String> localJobManagerAddressPromise,
+               Time timeout,
+               FileMode fileMode,
+               Configuration config,
+               BlobView blobView) {
+               super(localJobManagerAddressPromise, retriever, timeout);
+
+               this.executor = checkNotNull(executor);
+               this.config = config;
+               this.fileMode = fileMode;
+
+               this.blobView = Preconditions.checkNotNull(blobView, 
"blobView");
+       }
+
+       @Override
+       public String[] getPaths() {
+               switch (fileMode) {
+                       case LOG:
+                               return new String[]{TASKMANAGER_LOG_REST_PATH};
+                       case STDOUT:
+                       default:
+                               return new String[]{TASKMANAGER_OUT_REST_PATH};
+               }
+       }
+
+       /**
+        * Response when running with leading JobManager.
+        */
+       @Override
+       protected void respondAsLeader(final ChannelHandlerContext ctx, final 
Routed routed, final JobManagerGateway jobManagerGateway) {
+               if (cache == null) {
+                       CompletableFuture<Integer> blobPortFuture = 
jobManagerGateway.requestBlobServerPort(timeout);
+                       cache = blobPortFuture.thenApplyAsync(
+                               (Integer port) -> {
+                                       try {
+                                               return new BlobCache(new 
InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
+                                       } catch (IOException e) {
+                                               throw new 
FlinkFutureException("Could not create BlobCache.", e);
+                                       }
+                               },
+                               executor);
+               }
+
+               final String taskManagerID = 
routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
+               final HttpRequest request = routed.request();
+
+               //fetch TaskManager logs if no other process is currently doing 
it
+               if (lastRequestPending.putIfAbsent(taskManagerID, true) == 
null) {
+                       try {
+                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(taskManagerID));
+                               CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
+
+                               CompletableFuture<BlobKey> blobKeyFuture = 
taskManagerFuture.thenCompose(
+                                       (Optional<Instance> optTMInstance) -> {
+                                               Instance taskManagerInstance = 
optTMInstance.orElseThrow(
+                                                       () -> new 
FlinkFutureException("Could not find instance with " + instanceID + '.'));
+                                               switch (fileMode) {
+                                                       case LOG:
+                                                               return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
+                                                       case STDOUT:
+                                                       default:
+                                                               return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout);
+                                               }
+                                       }
+                               );
+
+                               CompletableFuture<String> logPathFuture = 
blobKeyFuture
+                                       .thenCombineAsync(
+                                               cache,
+                                               (blobKey, blobCache) -> {
+                                                       //delete previous log 
file, if it is different than the current one
+                                                       HashMap<String, 
BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : 
lastSubmittedStdout;
+                                                       if 
(lastSubmittedFile.containsKey(taskManagerID)) {
+                                                               if 
(!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
+                                                                       try {
+                                                                               
blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
+                                                                       } catch 
(IOException e) {
+                                                                               
throw new FlinkFutureException("Could not delete file for " + taskManagerID + 
'.', e);
+                                                                       }
+                                                                       
lastSubmittedFile.put(taskManagerID, blobKey);
+                                                               }
+                                                       } else {
+                                                               
lastSubmittedFile.put(taskManagerID, blobKey);
+                                                       }
+                                                       try {
+                                                               return 
blobCache.getFile(blobKey).getAbsolutePath();
+                                                       } catch (IOException e) 
{
+                                                               throw new 
FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
+                                                       }
+                                               },
+                                               executor);
+
+                               logPathFuture.exceptionally(
+                                       failure -> {
+                                               display(ctx, request, "Fetching 
TaskManager log failed.");
+                                               LOG.error("Fetching TaskManager 
log failed.", failure);
+                                               
lastRequestPending.remove(taskManagerID);
+
+                                               return null;
+                                       });
+
+                               logPathFuture.thenAccept(
+                                       filePath -> {
+                                               File file = new File(filePath);
+                                               final RandomAccessFile raf;
+                                               try {
+                                                       raf = new 
RandomAccessFile(file, "r");
+                                               } catch (FileNotFoundException 
e) {
+                                                       display(ctx, request, 
"Displaying TaskManager log failed.");
+                                                       LOG.error("Displaying 
TaskManager log failed.", e);
+
+                                                       return;
+                                               }
+                                               long fileLength;
+                                               try {
+                                                       fileLength = 
raf.length();
+                                               } catch (IOException ioe) {
+                                                       display(ctx, request, 
"Displaying TaskManager log failed.");
+                                                       LOG.error("Displaying 
TaskManager log failed.", ioe);
+                                                       try {
+                                                               raf.close();
+                                                       } catch (IOException e) 
{
+                                                               
LOG.error("Could not close random access file.", e);
+                                                       }
+
+                                                       return;
+                                               }
+                                               final FileChannel fc = 
raf.getChannel();
+
+                                               HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
+                                               
response.headers().set(CONTENT_TYPE, "text/plain");
+
+                                               if 
(HttpHeaders.isKeepAlive(request)) {
+                                                       
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+                                               }
+                                               
HttpHeaders.setContentLength(response, fileLength);
+
+                                               // write the initial line and 
the header.
+                                               ctx.write(response);
+
+                                               // write the content.
+                                               ChannelFuture lastContentFuture;
+                                               final 
GenericFutureListener<Future<? super Void>> completionListener = future -> {
+                                                       
lastRequestPending.remove(taskManagerID);
+                                                       fc.close();
+                                                       raf.close();
+                                               };
+                                               if 
(ctx.pipeline().get(SslHandler.class) == null) {
+                                                       ctx.write(
+                                                               new 
DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
+                                                                       
.addListener(completionListener);
+                                                       lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+                                               } else {
+                                                       try {
+                                                               
lastContentFuture = ctx.writeAndFlush(
+                                                                       new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
+                                                                       
ctx.newProgressivePromise())
+                                                                       
.addListener(completionListener);
+                                                       } catch (IOException e) 
{
+                                                               display(ctx, 
request, "Displaying TaskManager log failed.");
+                                                               LOG.warn("Could 
not write http data.", e);
+
+                                                               return;
+                                                       }
+                                                       // HttpChunkedInput 
will write the end marker (LastHttpContent) for us.
+                                               }
+
+                                               // close the connection, if no 
keep-alive is needed
+                                               if 
(!HttpHeaders.isKeepAlive(request)) {
+                                                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+                                               }
+                                       });
+                       } catch (Exception e) {
+                               display(ctx, request, "Error: " + 
e.getMessage());
+                               LOG.error("Fetching TaskManager log failed.", 
e);
+                               lastRequestPending.remove(taskManagerID);
+                       }
+               } else {
+                       display(ctx, request, "loading...");
+               }
+       }
+
+       private void display(ChannelHandlerContext ctx, HttpRequest request, 
String message) {
+               HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+               response.headers().set(CONTENT_TYPE, "text/plain");
+
+               if (HttpHeaders.isKeepAlive(request)) {
+                       response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
+               }
+
+               byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
+
+               ByteBuf b = Unpooled.copiedBuffer(buf);
+
+               HttpHeaders.setContentLength(response, buf.length);
+
+               // write the initial line and the header.
+               ctx.write(response);
+
+               ctx.write(b);
+
+               ChannelFuture lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
+
+               // close the connection, if no keep-alive is needed
+               if (!HttpHeaders.isKeepAlive(request)) {
+                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
+               }
+       }
+}

Reply via email to