http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
deleted file mode 100644
index cb6d8c0..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobManagerConfigHandler.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Returns the Job Manager's configuration.
- */
-public class JobManagerConfigHandler extends AbstractJsonRequestHandler {
-
-       private static final String JOBMANAGER_CONFIG_REST_PATH = 
"/jobmanager/config";
-
-       private final Configuration config;
-
-       public JobManagerConfigHandler(Executor executor, Configuration config) 
{
-               super(executor);
-               this.config = config;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOBMANAGER_CONFIG_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       StringWriter writer = new 
StringWriter();
-                                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                                       gen.writeStartArray();
-                                       for (String key : config.keySet()) {
-                                               gen.writeStartObject();
-                                               gen.writeStringField("key", 
key);
-
-                                               // Mask key values which 
contain sensitive information
-                                               if 
(key.toLowerCase().contains("password")) {
-                                                       String value = 
config.getString(key, null);
-                                                       if (value != null) {
-                                                               value = 
"******";
-                                                       }
-                                                       
gen.writeStringField("value", value);
-                                               } else {
-                                                       
gen.writeStringField("value", config.getString(key, null));
-                                               }
-                                               gen.writeEndObject();
-                                       }
-                                       gen.writeEndArray();
-
-                                       gen.close();
-                                       return writer.toString();
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not write configuration.", e);
-                               }
-                       },
-                       executor);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
deleted file mode 100644
index b3a9dd5..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the JSON program plan of a job graph.
- */
-public class JobPlanHandler extends AbstractExecutionGraphRequestHandler {
-
-       private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan";
-
-       public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_PLAN_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.completedFuture(graph.getJsonPlan());
-       }
-
-       /**
-        * Archivist for the JobPlanHandler.
-        */
-       public static class JobPlanJsonArchivist implements JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String path = JOB_PLAN_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       String json = graph.getJsonPlan();
-                       return Collections.singletonList(new ArchivedJson(path, 
json));
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
deleted file mode 100644
index f63403f..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobStoppingHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler for the STOP request.
- */
-public class JobStoppingHandler extends AbstractJsonRequestHandler {
-
-       private static final String JOB_STOPPING_REST_PATH = 
"/jobs/:jobid/stop";
-       private static final String JOB_STOPPING_YARN_REST_PATH = 
"/jobs/:jobid/yarn-stop";
-
-       private final Time timeout;
-
-       public JobStoppingHandler(Executor executor, Time timeout) {
-               super(executor);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_STOPPING_REST_PATH, 
JOB_STOPPING_YARN_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       JobID jobId = new 
JobID(StringUtils.hexStringToByte(pathParams.get("jobid")));
-                                       if (jobManagerGateway != null) {
-                                               
jobManagerGateway.stopJob(jobId, timeout);
-                                               return "{}";
-                                       }
-                                       else {
-                                               throw new Exception("No 
connection to the leading JobManager.");
-                                       }
-                               }
-                               catch (Exception e) {
-                                       throw new FlinkFutureException("Failed 
to stop the job with id: "  + pathParams.get("jobid") + '.', e);
-                               }
-                       },
-                       executor);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
deleted file mode 100644
index 9c613ff..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the accummulators for a given vertex.
- */
-public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
-
-       private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/accumulators";
-
-       public JobVertexAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_VERTEX_ACCUMULATORS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createVertexAccumulatorsJson(jobVertex);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create job vertex accumulators json.", e);
-                               }
-                       },
-                       executor);
-
-       }
-
-       /**
-        * Archivist for JobVertexAccumulatorsHandler.
-        */
-       public static class JobVertexAccumulatorsJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               String json = 
createVertexAccumulatorsJson(task);
-                               String path = JOB_VERTEX_ACCUMULATORS_REST_PATH
-                                       .replace(":jobid", 
graph.getJobID().toString())
-                                       .replace(":vertexid", 
task.getJobVertexId().toString());
-                               archive.add(new ArchivedJson(path, json));
-                       }
-                       return archive;
-               }
-       }
-
-       public static String 
createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
-
-               gen.writeStartObject();
-               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-
-               gen.writeArrayFieldStart("user-accumulators");
-               for (StringifiedAccumulatorResult acc : accs) {
-                       gen.writeStartObject();
-                       gen.writeStringField("name", acc.getName());
-                       gen.writeStringField("type", acc.getType());
-                       gen.writeStringField("value", acc.getValue());
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
deleted file mode 100644
index 963153f..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexBackPressureHandler.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.BackPressureStatsTracker;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.OperatorBackPressureStats;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import scala.Option;
-
-import static org.apache.flink.util.Preconditions.checkArgument;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns back pressure stats for a single job vertex and
- * all its sub tasks.
- */
-public class JobVertexBackPressureHandler extends 
AbstractJobVertexRequestHandler {
-
-       private static final String JOB_VERTEX_BACKPRESSURE_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/backpressure";
-
-       /** Back pressure stats tracker. */
-       private final BackPressureStatsTracker backPressureStatsTracker;
-
-       /** Time after which stats are considered outdated. */
-       private final int refreshInterval;
-
-       public JobVertexBackPressureHandler(
-                       ExecutionGraphHolder executionGraphHolder,
-                       Executor executor,
-                       BackPressureStatsTracker backPressureStatsTracker,
-                       int refreshInterval) {
-
-               super(executionGraphHolder, executor);
-               this.backPressureStatsTracker = 
checkNotNull(backPressureStatsTracker, "Stats tracker");
-               checkArgument(refreshInterval >= 0, "Negative timeout");
-               this.refreshInterval = refreshInterval;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_VERTEX_BACKPRESSURE_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(
-                       AccessExecutionJobVertex accessJobVertex,
-                       Map<String, String> params) {
-               if (accessJobVertex instanceof ArchivedExecutionJobVertex) {
-                       return CompletableFuture.completedFuture("");
-               }
-               ExecutionJobVertex jobVertex = (ExecutionJobVertex) 
accessJobVertex;
-               try (StringWriter writer = new StringWriter();
-                               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer)) {
-
-                       gen.writeStartObject();
-
-                       Option<OperatorBackPressureStats> statsOption = 
backPressureStatsTracker
-                                       
.getOperatorBackPressureStats(jobVertex);
-
-                       if (statsOption.isDefined()) {
-                               OperatorBackPressureStats stats = 
statsOption.get();
-
-                               // Check whether we need to refresh
-                               if (refreshInterval <= 
System.currentTimeMillis() - stats.getEndTimestamp()) {
-                                       
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
-                                       gen.writeStringField("status", 
"deprecated");
-                               } else {
-                                       gen.writeStringField("status", "ok");
-                               }
-
-                               gen.writeStringField("backpressure-level", 
getBackPressureLevel(stats.getMaxBackPressureRatio()));
-                               gen.writeNumberField("end-timestamp", 
stats.getEndTimestamp());
-
-                               // Sub tasks
-                               gen.writeArrayFieldStart("subtasks");
-                               int numSubTasks = stats.getNumberOfSubTasks();
-                               for (int i = 0; i < numSubTasks; i++) {
-                                       double ratio = 
stats.getBackPressureRatio(i);
-
-                                       gen.writeStartObject();
-                                       gen.writeNumberField("subtask", i);
-                                       
gen.writeStringField("backpressure-level", getBackPressureLevel(ratio));
-                                       gen.writeNumberField("ratio", ratio);
-                                       gen.writeEndObject();
-                               }
-                               gen.writeEndArray();
-                       } else {
-                               
backPressureStatsTracker.triggerStackTraceSample(jobVertex);
-                               gen.writeStringField("status", "deprecated");
-                       }
-
-                       gen.writeEndObject();
-                       gen.close();
-
-                       return 
CompletableFuture.completedFuture(writer.toString());
-               } catch (IOException e) {
-                       return FutureUtils.completedExceptionally(e);
-               }
-       }
-
-       /**
-        * Returns the back pressure level as a String.
-        *
-        * @param backPressureRatio Ratio of back pressures samples to total 
number of samples.
-        *
-        * @return Back pressure level ('no', 'low', or 'high')
-        */
-       static String getBackPressureLevel(double backPressureRatio) {
-               if (backPressureRatio <= 0.10) {
-                       return "ok";
-               } else if (backPressureRatio <= 0.5) {
-                       return "low";
-               } else {
-                       return "high";
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
deleted file mode 100644
index bd1745c..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * A request handler that provides the details of a job vertex, including id, 
name, parallelism,
- * and the runtime and metrics of all its subtasks.
- */
-public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler {
-
-       private static final String JOB_VERTEX_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid";
-
-       private final MetricFetcher fetcher;
-
-       public JobVertexDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
-               super(executionGraphHolder, executor);
-               this.fetcher = fetcher;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_VERTEX_DETAILS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not write the vertex details json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the JobVertexDetailsHandler.
-        */
-       public static class JobVertexDetailsJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               String json = createVertexDetailsJson(task, 
graph.getJobID().toString(), null);
-                               String path = JOB_VERTEX_DETAILS_REST_PATH
-                                       .replace(":jobid", 
graph.getJobID().toString())
-                                       .replace(":vertexid", 
task.getJobVertexId().toString());
-                               archive.add(new ArchivedJson(path, json));
-                       }
-                       return archive;
-               }
-       }
-
-       public static String createVertexDetailsJson(
-                       AccessExecutionJobVertex jobVertex,
-                       String jobID,
-                       @Nullable MetricFetcher fetcher) throws IOException {
-               final long now = System.currentTimeMillis();
-
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-
-               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeStringField("name", jobVertex.getName());
-               gen.writeNumberField("parallelism", jobVertex.getParallelism());
-               gen.writeNumberField("now", now);
-
-               gen.writeArrayFieldStart("subtasks");
-               int num = 0;
-               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-                       final ExecutionState status = 
vertex.getExecutionState();
-
-                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
-                       String locationString = location == null ? 
"(unassigned)" : location.getHostname() + ":" + location.dataPort();
-
-                       long startTime = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
-                       if (startTime == 0) {
-                               startTime = -1;
-                       }
-                       long endTime = status.isTerminal() ? 
vertex.getStateTimestamp(status) : -1;
-                       long duration = startTime > 0 ? ((endTime > 0 ? endTime 
: now) - startTime) : -1;
-
-                       gen.writeStartObject();
-                       gen.writeNumberField("subtask", num);
-                       gen.writeStringField("status", status.name());
-                       gen.writeNumberField("attempt", 
vertex.getCurrentExecutionAttempt().getAttemptNumber());
-                       gen.writeStringField("host", locationString);
-                       gen.writeNumberField("start-time", startTime);
-                       gen.writeNumberField("end-time", endTime);
-                       gen.writeNumberField("duration", duration);
-
-                       MutableIOMetrics counts = new MutableIOMetrics();
-
-                       counts.addIOMetrics(
-                               vertex.getCurrentExecutionAttempt(),
-                               fetcher,
-                               jobID,
-                               jobVertex.getJobVertexId().toString()
-                       );
-
-                       counts.writeIOMetricsAsJson(gen);
-
-                       gen.writeEndObject();
-
-                       num++;
-               }
-               gen.writeEndArray();
-
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
deleted file mode 100644
index 0827720..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * A request handler that provides the details of a job vertex, including id, 
name, and the
- * runtime and metrics of all its subtasks aggregated by TaskManager.
- */
-public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandler {
-
-       private static final String JOB_VERTEX_TASKMANAGERS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/taskmanagers";
-
-       private final MetricFetcher fetcher;
-
-       public JobVertexTaskManagersHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
-               super(executionGraphHolder, executor);
-               this.fetcher = fetcher;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_VERTEX_TASKMANAGERS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create TaskManager json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for JobVertexTaskManagersHandler.
-        */
-       public static class JobVertexTaskManagersJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               String json = 
createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null);
-                               String path = JOB_VERTEX_TASKMANAGERS_REST_PATH
-                                       .replace(":jobid", 
graph.getJobID().toString())
-                                       .replace(":vertexid", 
task.getJobVertexId().toString());
-                               archive.add(new ArchivedJson(path, json));
-                       }
-                       return archive;
-               }
-       }
-
-       public static String createVertexDetailsByTaskManagerJson(
-                       AccessExecutionJobVertex jobVertex,
-                       String jobID,
-                       @Nullable MetricFetcher fetcher) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               // Build a map that groups tasks by TaskManager
-               Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
-
-               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
-                       String taskManager = location == null ? "(unassigned)" 
: location.getHostname() + ":" + location.dataPort();
-
-                       List<AccessExecutionVertex> vertices = 
taskManagerVertices.get(taskManager);
-
-                       if (vertices == null) {
-                               vertices = new ArrayList<>();
-                               taskManagerVertices.put(taskManager, vertices);
-                       }
-
-                       vertices.add(vertex);
-               }
-
-               // Build JSON response
-               final long now = System.currentTimeMillis();
-
-               gen.writeStartObject();
-
-               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeStringField("name", jobVertex.getName());
-               gen.writeNumberField("now", now);
-
-               gen.writeArrayFieldStart("taskmanagers");
-               for (Map.Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
-                       String host = entry.getKey();
-                       List<AccessExecutionVertex> taskVertices = 
entry.getValue();
-
-                       int[] tasksPerState = new 
int[ExecutionState.values().length];
-
-                       long startTime = Long.MAX_VALUE;
-                       long endTime = 0;
-                       boolean allFinished = true;
-
-                       MutableIOMetrics counts = new MutableIOMetrics();
-
-                       for (AccessExecutionVertex vertex : taskVertices) {
-                               final ExecutionState state = 
vertex.getExecutionState();
-                               tasksPerState[state.ordinal()]++;
-
-                               // take the earliest start time
-                               long started = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
-                               if (started > 0) {
-                                       startTime = Math.min(startTime, 
started);
-                               }
-
-                               allFinished &= state.isTerminal();
-                               endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
-
-                               counts.addIOMetrics(
-                                       vertex.getCurrentExecutionAttempt(),
-                                       fetcher,
-                                       jobID,
-                                       jobVertex.getJobVertexId().toString());
-                       }
-
-                       long duration;
-                       if (startTime < Long.MAX_VALUE) {
-                               if (allFinished) {
-                                       duration = endTime - startTime;
-                               }
-                               else {
-                                       endTime = -1L;
-                                       duration = now - startTime;
-                               }
-                       }
-                       else {
-                               startTime = -1L;
-                               endTime = -1L;
-                               duration = -1L;
-                       }
-
-                       ExecutionState jobVertexState =
-                               
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
taskVertices.size());
-
-                       gen.writeStartObject();
-
-                       gen.writeStringField("host", host);
-                       gen.writeStringField("status", jobVertexState.name());
-
-                       gen.writeNumberField("start-time", startTime);
-                       gen.writeNumberField("end-time", endTime);
-                       gen.writeNumberField("duration", duration);
-
-                       counts.writeIOMetricsAsJson(gen);
-
-                       gen.writeObjectFieldStart("status-counts");
-                       for (ExecutionState state : ExecutionState.values()) {
-                               gen.writeNumberField(state.name(), 
tasksPerState[state.ordinal()]);
-                       }
-                       gen.writeEndObject();
-
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
deleted file mode 100644
index 4ce0baf..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JsonFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-/**
- * A holder for the singleton Jackson JSON factory. Since the Jackson's JSON 
factory object
- * is a heavyweight object that is encouraged to be shared, we use a singleton 
instance across
- * all request handlers.
- */
-public class JsonFactory {
-
-       /** The singleton Jackson JSON factory. */
-       public static final com.fasterxml.jackson.core.JsonFactory 
JACKSON_FACTORY =
-                       new com.fasterxml.jackson.core.JsonFactory();
-
-       // 
--------------------------------------------------------------------------------------------
-
-       private JsonFactory() {}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
deleted file mode 100644
index 8ca785f..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandler.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpResponse;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-
-/**
- * Base interface for all request handlers.
- *
- * <p>Most handlers will want to use the {@link AbstractJsonRequestHandler}
- * as a starting point, which produces a valid HTTP response.
- */
-public interface RequestHandler {
-
-       /**
-        * Core method that handles the request and generates the response. The 
method needs to
-        * respond with a full http response, including content-type, 
content-length, etc.
-        *
-        * <p>Exceptions may be throws and will be handled.
-        *
-        * @param pathParams The map of REST path parameters, decoded by the 
router.
-        * @param queryParams The map of query parameters.
-        * @param jobManagerGateway to talk to the JobManager.
-        *
-        * @return The full http response.
-        */
-       CompletableFuture<FullHttpResponse> handleRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway);
-
-       /**
-        * Returns an array of REST URL's under which this handler can be 
registered.
-        *
-        * @return array containing REST URL's under which this handler can be 
registered.
-        */
-       String[] getPaths();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
deleted file mode 100644
index bb61d16..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/RequestHandlerException.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-/**
- * Base class for request handler exceptions.
- */
-public class RequestHandlerException extends Exception {
-
-       private static final long serialVersionUID = 7570352908725875886L;
-
-       public RequestHandlerException(String message) {
-               super(message);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
deleted file mode 100644
index 301b217..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskCurrentAttemptDetailsHandler.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler providing details about a single task execution attempt.
- */
-public class SubtaskCurrentAttemptDetailsHandler extends 
SubtaskExecutionAttemptDetailsHandler {
-
-       public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum";
-
-       public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
-               super(executionGraphHolder, executor, fetcher);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionVertex 
vertex, Map<String, String> params) {
-               return handleRequest(vertex.getCurrentExecutionAttempt(), 
params);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
deleted file mode 100644
index 3c0d1d9..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Base class for request handlers whose response depends on a specific job 
vertex (defined
- * via the "vertexid" parameter) in a specific job, defined via (defined voa 
the "jobid" parameter).
- */
-public class SubtaskExecutionAttemptAccumulatorsHandler extends 
AbstractSubtaskAttemptRequestHandler {
-
-       private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators";
-
-       public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createAttemptAccumulatorsJson(execAttempt);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create accumulator json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the SubtaskExecutionAttemptAccumulatorsHandler.
-        */
-       public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist 
implements JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
-                                       String curAttemptJson = 
createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt());
-                                       String curAttemptPath = 
SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
-                                               .replace(":jobid", 
graph.getJobID().toString())
-                                               .replace(":vertexid", 
task.getJobVertexId().toString())
-                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                               .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
-
-                                       archive.add(new 
ArchivedJson(curAttemptPath, curAttemptJson));
-
-                                       for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
-                                               AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
-                                               String json = 
createAttemptAccumulatorsJson(attempt);
-                                               String path = 
SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
-                                                       .replace(":jobid", 
graph.getJobID().toString())
-                                                       .replace(":vertexid", 
task.getJobVertexId().toString())
-                                                       .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                                       .replace(":attempt", 
String.valueOf(attempt.getAttemptNumber()));
-                                               archive.add(new 
ArchivedJson(path, json));
-                                       }
-                               }
-                       }
-                       return archive;
-               }
-       }
-
-       public static String createAttemptAccumulatorsJson(AccessExecution 
execAttempt) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
-
-               gen.writeStartObject();
-
-               gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
-               gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
-               gen.writeStringField("id", 
execAttempt.getAttemptId().toString());
-
-               gen.writeArrayFieldStart("user-accumulators");
-               for (StringifiedAccumulatorResult acc : accs) {
-                       gen.writeStartObject();
-                       gen.writeStringField("name", acc.getName());
-                       gen.writeStringField("type", acc.getType());
-                       gen.writeStringField("value", acc.getValue());
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
deleted file mode 100644
index ad836df..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static 
org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
-
-/**
- * Request handler providing details about a single task execution attempt.
- */
-public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemptRequestHandler {
-
-       private static final String SUBTASK_ATTEMPT_DETAILS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt";
-
-       private final MetricFetcher fetcher;
-
-       public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, MetricFetcher fetcher) {
-               super(executionGraphHolder, executor);
-               this.fetcher = fetcher;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{SUBTASK_ATTEMPT_DETAILS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecution 
execAttempt, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createAttemptDetailsJson(execAttempt, params.get("jobid"), 
params.get("vertexid"), fetcher);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create attempt details json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the SubtaskExecutionAttemptDetailsHandler.
-        */
-       public static class SubtaskExecutionAttemptDetailsJsonArchivist 
implements JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
-                                       String curAttemptJson = 
createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), 
graph.getJobID().toString(), task.getJobVertexId().toString(), null);
-                                       String curAttemptPath1 = 
SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH
-                                               .replace(":jobid", 
graph.getJobID().toString())
-                                               .replace(":vertexid", 
task.getJobVertexId().toString())
-                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()));
-                                       String curAttemptPath2 = 
SUBTASK_ATTEMPT_DETAILS_REST_PATH
-                                               .replace(":jobid", 
graph.getJobID().toString())
-                                               .replace(":vertexid", 
task.getJobVertexId().toString())
-                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                               .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
-
-                                       archive.add(new 
ArchivedJson(curAttemptPath1, curAttemptJson));
-                                       archive.add(new 
ArchivedJson(curAttemptPath2, curAttemptJson));
-
-                                       for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
-                                               AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
-                                               String json = 
createAttemptDetailsJson(attempt, graph.getJobID().toString(), 
task.getJobVertexId().toString(), null);
-                                               String path = 
SUBTASK_ATTEMPT_DETAILS_REST_PATH
-                                                       .replace(":jobid", 
graph.getJobID().toString())
-                                                       .replace(":vertexid", 
task.getJobVertexId().toString())
-                                                       .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
-                                                       .replace(":attempt", 
String.valueOf(attempt.getAttemptNumber()));
-                                               archive.add(new 
ArchivedJson(path, json));
-                                       }
-                               }
-                       }
-                       return archive;
-               }
-       }
-
-       public static String createAttemptDetailsJson(
-                       AccessExecution execAttempt,
-                       String jobID,
-                       String vertexID,
-                       @Nullable MetricFetcher fetcher) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               final ExecutionState status = execAttempt.getState();
-               final long now = System.currentTimeMillis();
-
-               TaskManagerLocation location = 
execAttempt.getAssignedResourceLocation();
-               String locationString = location == null ? "(unassigned)" : 
location.getHostname();
-
-               long startTime = 
execAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
-               if (startTime == 0) {
-                       startTime = -1;
-               }
-               long endTime = status.isTerminal() ? 
execAttempt.getStateTimestamp(status) : -1;
-               long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) 
- startTime) : -1;
-
-               gen.writeStartObject();
-               gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
-               gen.writeStringField("status", status.name());
-               gen.writeNumberField("attempt", execAttempt.getAttemptNumber());
-               gen.writeStringField("host", locationString);
-               gen.writeNumberField("start-time", startTime);
-               gen.writeNumberField("end-time", endTime);
-               gen.writeNumberField("duration", duration);
-
-               MutableIOMetrics counts = new MutableIOMetrics();
-
-               counts.addIOMetrics(
-                       execAttempt,
-                       fetcher,
-                       jobID,
-                       vertexID
-               );
-
-               counts.writeIOMetricsAsJson(gen);
-
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
deleted file mode 100644
index 8142548..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the accumulators for all subtasks of job 
vertex.
- */
-public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHandler {
-
-       private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH =       
"/jobs/:jobid/vertices/:vertexid/subtasks/accumulators";
-
-       public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{SUBTASKS_ALL_ACCUMULATORS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createSubtasksAccumulatorsJson(jobVertex);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create subtasks accumulator json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the SubtasksAllAccumulatorsHandler.
-        */
-       public static class SubtasksAllAccumulatorsJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               String json = 
createSubtasksAccumulatorsJson(task);
-                               String path = 
SUBTASKS_ALL_ACCUMULATORS_REST_PATH
-                                       .replace(":jobid", 
graph.getJobID().toString())
-                                       .replace(":vertexid", 
task.getJobVertexId().toString());
-                               archive.add(new ArchivedJson(path, json));
-                       }
-                       return archive;
-               }
-       }
-
-       public static String 
createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeNumberField("parallelism", jobVertex.getParallelism());
-
-               gen.writeArrayFieldStart("subtasks");
-
-               int num = 0;
-               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-
-                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
-                       String locationString = location == null ? 
"(unassigned)" : location.getHostname();
-
-                       gen.writeStartObject();
-
-                       gen.writeNumberField("subtask", num++);
-                       gen.writeNumberField("attempt", 
vertex.getCurrentExecutionAttempt().getAttemptNumber());
-                       gen.writeStringField("host", locationString);
-
-                       StringifiedAccumulatorResult[] accs = 
vertex.getCurrentExecutionAttempt().getUserAccumulatorsStringified();
-                       gen.writeArrayFieldStart("user-accumulators");
-                       for (StringifiedAccumulatorResult acc : accs) {
-                               gen.writeStartObject();
-                               gen.writeStringField("name", acc.getName());
-                               gen.writeStringField("type", acc.getType());
-                               gen.writeStringField("value", acc.getValue());
-                               gen.writeEndObject();
-                       }
-                       gen.writeEndArray();
-
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-
-               gen.writeEndObject();
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
deleted file mode 100644
index d766206..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns the state transition timestamps for all 
subtasks, plus their
- * location and duration.
- */
-public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler {
-
-       private static final String SUBTASK_TIMES_REST_PATH =   
"/jobs/:jobid/vertices/:vertexid/subtasktimes";
-
-       public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, 
Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{SUBTASK_TIMES_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionJobVertex 
jobVertex, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createSubtaskTimesJson(jobVertex);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not write subtask time json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the SubtasksTimesHandler.
-        */
-       public static class SubtasksTimesJsonArchivist implements JsonArchivist 
{
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
-                               String json = createSubtaskTimesJson(task);
-                               String path = SUBTASK_TIMES_REST_PATH
-                                       .replace(":jobid", 
graph.getJobID().toString())
-                                       .replace(":vertexid", 
task.getJobVertexId().toString());
-                               archive.add(new ArchivedJson(path, json));
-                       }
-                       return archive;
-               }
-       }
-
-       public static String createSubtaskTimesJson(AccessExecutionJobVertex 
jobVertex) throws IOException {
-               final long now = System.currentTimeMillis();
-
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-
-               gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
-               gen.writeStringField("name", jobVertex.getName());
-               gen.writeNumberField("now", now);
-
-               gen.writeArrayFieldStart("subtasks");
-
-               int num = 0;
-               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
-
-                       long[] timestamps = 
vertex.getCurrentExecutionAttempt().getStateTimestamps();
-                       ExecutionState status = vertex.getExecutionState();
-
-                       long scheduledTime = 
timestamps[ExecutionState.SCHEDULED.ordinal()];
-
-                       long start = scheduledTime > 0 ? scheduledTime : -1;
-                       long end = status.isTerminal() ? 
timestamps[status.ordinal()] : now;
-                       long duration = start >= 0 ? end - start : -1L;
-
-                       gen.writeStartObject();
-                       gen.writeNumberField("subtask", num++);
-
-                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
-                       String locationString = location == null ? 
"(unassigned)" : location.getHostname();
-                       gen.writeStringField("host", locationString);
-
-                       gen.writeNumberField("duration", duration);
-
-                       gen.writeObjectFieldStart("timestamps");
-                       for (ExecutionState state : ExecutionState.values()) {
-                               gen.writeNumberField(state.name(), 
timestamps[state.ordinal()]);
-                       }
-                       gen.writeEndObject();
-
-                       gen.writeEndObject();
-               }
-
-               gen.writeEndArray();
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
deleted file mode 100644
index d53d2b1..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandler.java
+++ /dev/null
@@ -1,335 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-/*****************************************************************************
- * This code is based on the "HttpStaticFileServerHandler" from the
- * Netty project's HTTP server example.
- *
- * See http://netty.io and
- * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
- *****************************************************************************/
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobCache;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.blob.BlobView;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.rest.handler.RedirectHandler;
-import org.apache.flink.runtime.webmonitor.WebHandler;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.StringUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.buffer.Unpooled;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultHttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpChunkedInput;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.LastHttpContent;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
-import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedFile;
-import org.apache.flink.shaded.netty4.io.netty.util.concurrent.Future;
-import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.GenericFutureListener;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.nio.channels.FileChannel;
-import java.util.HashMap;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executor;
-
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.OK;
-import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns the TaskManager log/out files.
- *
- * <p>This code is based on the "HttpStaticFileServerHandler" from the Netty 
project's HTTP server
- * example.</p>
- */
[email protected]
-public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> 
implements WebHandler {
-       private static final Logger LOG = 
LoggerFactory.getLogger(TaskManagerLogHandler.class);
-
-       private static final String TASKMANAGER_LOG_REST_PATH = 
"/taskmanagers/:taskmanagerid/log";
-       private static final String TASKMANAGER_OUT_REST_PATH = 
"/taskmanagers/:taskmanagerid/stdout";
-
-       /** Keep track of last transmitted log, to clean up old ones. */
-       private final HashMap<String, BlobKey> lastSubmittedLog = new 
HashMap<>();
-       private final HashMap<String, BlobKey> lastSubmittedStdout = new 
HashMap<>();
-
-       /** Keep track of request status, prevents multiple log requests for a 
single TM running concurrently. */
-       private final ConcurrentHashMap<String, Boolean> lastRequestPending = 
new ConcurrentHashMap<>();
-       private final Configuration config;
-
-       /** Future of the blob cache. */
-       private CompletableFuture<BlobCache> cache;
-
-       /** Indicates which log file should be displayed. */
-       private FileMode fileMode;
-
-       private final Executor executor;
-
-       private final BlobView blobView;
-
-       /** Used to control whether this handler serves the .log or .out file. 
*/
-       public enum FileMode {
-               LOG,
-               STDOUT
-       }
-
-       public TaskManagerLogHandler(
-               GatewayRetriever<JobManagerGateway> retriever,
-               Executor executor,
-               CompletableFuture<String> localJobManagerAddressPromise,
-               Time timeout,
-               FileMode fileMode,
-               Configuration config,
-               BlobView blobView) {
-               super(localJobManagerAddressPromise, retriever, timeout);
-
-               this.executor = checkNotNull(executor);
-               this.config = config;
-               this.fileMode = fileMode;
-
-               this.blobView = Preconditions.checkNotNull(blobView, 
"blobView");
-       }
-
-       @Override
-       public String[] getPaths() {
-               switch (fileMode) {
-                       case LOG:
-                               return new String[]{TASKMANAGER_LOG_REST_PATH};
-                       case STDOUT:
-                       default:
-                               return new String[]{TASKMANAGER_OUT_REST_PATH};
-               }
-       }
-
-       /**
-        * Response when running with leading JobManager.
-        */
-       @Override
-       protected void respondAsLeader(final ChannelHandlerContext ctx, final 
Routed routed, final JobManagerGateway jobManagerGateway) {
-               if (cache == null) {
-                       CompletableFuture<Integer> blobPortFuture = 
jobManagerGateway.requestBlobServerPort(timeout);
-                       cache = blobPortFuture.thenApplyAsync(
-                               (Integer port) -> {
-                                       try {
-                                               return new BlobCache(new 
InetSocketAddress(jobManagerGateway.getHostname(), port), config, blobView);
-                                       } catch (IOException e) {
-                                               throw new 
FlinkFutureException("Could not create BlobCache.", e);
-                                       }
-                               },
-                               executor);
-               }
-
-               final String taskManagerID = 
routed.pathParams().get(TaskManagersHandler.TASK_MANAGER_ID_KEY);
-               final HttpRequest request = routed.request();
-
-               //fetch TaskManager logs if no other process is currently doing 
it
-               if (lastRequestPending.putIfAbsent(taskManagerID, true) == 
null) {
-                       try {
-                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(taskManagerID));
-                               CompletableFuture<Optional<Instance>> 
taskManagerFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
-
-                               CompletableFuture<BlobKey> blobKeyFuture = 
taskManagerFuture.thenCompose(
-                                       (Optional<Instance> optTMInstance) -> {
-                                               Instance taskManagerInstance = 
optTMInstance.orElseThrow(
-                                                       () -> new 
FlinkFutureException("Could not find instance with " + instanceID + '.'));
-                                               switch (fileMode) {
-                                                       case LOG:
-                                                               return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerLog(timeout);
-                                                       case STDOUT:
-                                                       default:
-                                                               return 
taskManagerInstance.getTaskManagerGateway().requestTaskManagerStdout(timeout);
-                                               }
-                                       }
-                               );
-
-                               CompletableFuture<String> logPathFuture = 
blobKeyFuture
-                                       .thenCombineAsync(
-                                               cache,
-                                               (blobKey, blobCache) -> {
-                                                       //delete previous log 
file, if it is different than the current one
-                                                       HashMap<String, 
BlobKey> lastSubmittedFile = fileMode == FileMode.LOG ? lastSubmittedLog : 
lastSubmittedStdout;
-                                                       if 
(lastSubmittedFile.containsKey(taskManagerID)) {
-                                                               if 
(!Objects.equals(blobKey, lastSubmittedFile.get(taskManagerID))) {
-                                                                       try {
-                                                                               
blobCache.deleteGlobal(lastSubmittedFile.get(taskManagerID));
-                                                                       } catch 
(IOException e) {
-                                                                               
throw new FlinkFutureException("Could not delete file for " + taskManagerID + 
'.', e);
-                                                                       }
-                                                                       
lastSubmittedFile.put(taskManagerID, blobKey);
-                                                               }
-                                                       } else {
-                                                               
lastSubmittedFile.put(taskManagerID, blobKey);
-                                                       }
-                                                       try {
-                                                               return 
blobCache.getFile(blobKey).getAbsolutePath();
-                                                       } catch (IOException e) 
{
-                                                               throw new 
FlinkFutureException("Could not retrieve blob for " + blobKey + '.', e);
-                                                       }
-                                               },
-                                               executor);
-
-                               logPathFuture.exceptionally(
-                                       failure -> {
-                                               display(ctx, request, "Fetching 
TaskManager log failed.");
-                                               LOG.error("Fetching TaskManager 
log failed.", failure);
-                                               
lastRequestPending.remove(taskManagerID);
-
-                                               return null;
-                                       });
-
-                               logPathFuture.thenAccept(
-                                       filePath -> {
-                                               File file = new File(filePath);
-                                               final RandomAccessFile raf;
-                                               try {
-                                                       raf = new 
RandomAccessFile(file, "r");
-                                               } catch (FileNotFoundException 
e) {
-                                                       display(ctx, request, 
"Displaying TaskManager log failed.");
-                                                       LOG.error("Displaying 
TaskManager log failed.", e);
-
-                                                       return;
-                                               }
-                                               long fileLength;
-                                               try {
-                                                       fileLength = 
raf.length();
-                                               } catch (IOException ioe) {
-                                                       display(ctx, request, 
"Displaying TaskManager log failed.");
-                                                       LOG.error("Displaying 
TaskManager log failed.", ioe);
-                                                       try {
-                                                               raf.close();
-                                                       } catch (IOException e) 
{
-                                                               
LOG.error("Could not close random access file.", e);
-                                                       }
-
-                                                       return;
-                                               }
-                                               final FileChannel fc = 
raf.getChannel();
-
-                                               HttpResponse response = new 
DefaultHttpResponse(HTTP_1_1, OK);
-                                               
response.headers().set(CONTENT_TYPE, "text/plain");
-
-                                               if 
(HttpHeaders.isKeepAlive(request)) {
-                                                       
response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
-                                               }
-                                               
HttpHeaders.setContentLength(response, fileLength);
-
-                                               // write the initial line and 
the header.
-                                               ctx.write(response);
-
-                                               // write the content.
-                                               ChannelFuture lastContentFuture;
-                                               final 
GenericFutureListener<Future<? super Void>> completionListener = future -> {
-                                                       
lastRequestPending.remove(taskManagerID);
-                                                       fc.close();
-                                                       raf.close();
-                                               };
-                                               if 
(ctx.pipeline().get(SslHandler.class) == null) {
-                                                       ctx.write(
-                                                               new 
DefaultFileRegion(fc, 0, fileLength), ctx.newProgressivePromise())
-                                                                       
.addListener(completionListener);
-                                                       lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
-                                               } else {
-                                                       try {
-                                                               
lastContentFuture = ctx.writeAndFlush(
-                                                                       new 
HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)),
-                                                                       
ctx.newProgressivePromise())
-                                                                       
.addListener(completionListener);
-                                                       } catch (IOException e) 
{
-                                                               display(ctx, 
request, "Displaying TaskManager log failed.");
-                                                               LOG.warn("Could 
not write http data.", e);
-
-                                                               return;
-                                                       }
-                                                       // HttpChunkedInput 
will write the end marker (LastHttpContent) for us.
-                                               }
-
-                                               // close the connection, if no 
keep-alive is needed
-                                               if 
(!HttpHeaders.isKeepAlive(request)) {
-                                                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-                                               }
-                                       });
-                       } catch (Exception e) {
-                               display(ctx, request, "Error: " + 
e.getMessage());
-                               LOG.error("Fetching TaskManager log failed.", 
e);
-                               lastRequestPending.remove(taskManagerID);
-                       }
-               } else {
-                       display(ctx, request, "loading...");
-               }
-       }
-
-       private void display(ChannelHandlerContext ctx, HttpRequest request, 
String message) {
-               HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
-               response.headers().set(CONTENT_TYPE, "text/plain");
-
-               if (HttpHeaders.isKeepAlive(request)) {
-                       response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
-               }
-
-               byte[] buf = message.getBytes(ConfigConstants.DEFAULT_CHARSET);
-
-               ByteBuf b = Unpooled.copiedBuffer(buf);
-
-               HttpHeaders.setContentLength(response, buf.length);
-
-               // write the initial line and the header.
-               ctx.write(response);
-
-               ctx.write(b);
-
-               ChannelFuture lastContentFuture = 
ctx.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
-
-               // close the connection, if no keep-alive is needed
-               if (!HttpHeaders.isKeepAlive(request)) {
-                       
lastContentFuture.addListener(ChannelFutureListener.CLOSE);
-               }
-       }
-}

Reply via email to