http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
deleted file mode 100644
index 9f83ed0..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandler.java
+++ /dev/null
@@ -1,205 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
-import org.apache.flink.util.StringUtils;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A request handler that provides an overview over all taskmanagers or 
details for a single one.
- */
-public class TaskManagersHandler extends AbstractJsonRequestHandler  {
-
-       private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
-       private static final String TASKMANAGER_DETAILS_REST_PATH = 
"/taskmanagers/:taskmanagerid";
-
-       public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
-
-       private final Time timeout;
-
-       private final MetricFetcher fetcher;
-
-       public TaskManagersHandler(Executor executor, Time timeout, 
MetricFetcher fetcher) {
-               super(executor);
-               this.timeout = requireNonNull(timeout);
-               this.fetcher = fetcher;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{TASKMANAGERS_REST_PATH, 
TASKMANAGER_DETAILS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               if (jobManagerGateway != null) {
-                       // whether one task manager's metrics are requested, or 
all task manager, we
-                       // return them in an array. This avoids unnecessary 
code complexity.
-                       // If only one task manager is requested, we only fetch 
one task manager metrics.
-                       if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
-                               CompletableFuture<Optional<Instance>> 
tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
-
-                               return tmInstanceFuture.thenApplyAsync(
-                                       (Optional<Instance> optTaskManager) -> {
-                                               try {
-                                                       return 
writeTaskManagersJson(
-                                                               
optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
-                                                               pathParams);
-                                               } catch (IOException e) {
-                                                       throw new 
FlinkFutureException("Could not write TaskManagers JSON.", e);
-                                               }
-                                       },
-                                       executor);
-                       } else {
-                               CompletableFuture<Collection<Instance>> 
tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-                               return tmInstancesFuture.thenApplyAsync(
-                                       (Collection<Instance> taskManagers) -> {
-                                               try {
-                                                       return 
writeTaskManagersJson(taskManagers, pathParams);
-                                               } catch (IOException e) {
-                                                       throw new 
FlinkFutureException("Could not write TaskManagers JSON.", e);
-                                               }
-                                       },
-                                       executor);
-                       }
-               }
-               else {
-                       return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
-               }
-       }
-
-       private String writeTaskManagersJson(Collection<Instance> instances, 
Map<String, String> pathParams) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-               gen.writeArrayFieldStart("taskmanagers");
-
-               for (Instance instance : instances) {
-                       gen.writeStartObject();
-                       gen.writeStringField("id", instance.getId().toString());
-                       gen.writeStringField("path", 
instance.getTaskManagerGateway().getAddress());
-                       gen.writeNumberField("dataPort", 
instance.getTaskManagerLocation().dataPort());
-                       gen.writeNumberField("timeSinceLastHeartbeat", 
instance.getLastHeartBeat());
-                       gen.writeNumberField("slotsNumber", 
instance.getTotalNumberOfSlots());
-                       gen.writeNumberField("freeSlots", 
instance.getNumberOfAvailableSlots());
-                       gen.writeNumberField("cpuCores", 
instance.getResources().getNumberOfCPUCores());
-                       gen.writeNumberField("physicalMemory", 
instance.getResources().getSizeOfPhysicalMemory());
-                       gen.writeNumberField("freeMemory", 
instance.getResources().getSizeOfJvmHeap());
-                       gen.writeNumberField("managedMemory", 
instance.getResources().getSizeOfManagedMemory());
-
-                       // only send metrics when only one task manager 
requests them.
-                       if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
-                               fetcher.update();
-                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
-                               if (metrics != null) {
-                                       gen.writeObjectFieldStart("metrics");
-                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
-                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
-                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
-
-                                       gen.writeNumberField("heapCommitted", 
heapCommitted);
-                                       gen.writeNumberField("heapUsed", 
heapUsed);
-                                       gen.writeNumberField("heapMax", 
heapTotal);
-
-                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
-                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
-                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
-
-                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
-                                       gen.writeNumberField("nonHeapUsed", 
nonHeapUsed);
-                                       gen.writeNumberField("nonHeapMax", 
nonHeapTotal);
-
-                                       gen.writeNumberField("totalCommitted", 
heapCommitted + nonHeapCommitted);
-                                       gen.writeNumberField("totalUsed", 
heapUsed + nonHeapUsed);
-                                       gen.writeNumberField("totalMax", 
heapTotal + nonHeapTotal);
-
-                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
-                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
-                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
-
-                                       gen.writeNumberField("directCount", 
directCount);
-                                       gen.writeNumberField("directUsed", 
directUsed);
-                                       gen.writeNumberField("directMax", 
directMax);
-
-                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
-                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
-                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
-
-                                       gen.writeNumberField("mappedCount", 
mappedCount);
-                                       gen.writeNumberField("mappedUsed", 
mappedUsed);
-                                       gen.writeNumberField("mappedMax", 
mappedMax);
-
-                                       long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
-                                       long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
-
-                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
-                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
-
-                                       
gen.writeArrayFieldStart("garbageCollectors");
-
-                                       for (String gcName : 
metrics.garbageCollectorNames) {
-                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
-                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
-                                               if (count != null  && time != 
null) {
-                                                       gen.writeStartObject();
-                                                       
gen.writeStringField("name", gcName);
-                                                       
gen.writeNumberField("count", Long.valueOf(count));
-                                                       
gen.writeNumberField("time", Long.valueOf(time));
-                                                       gen.writeEndObject();
-                                               }
-                                       }
-
-                                       gen.writeEndArray();
-                                       gen.writeEndObject();
-                               }
-                       }
-
-                       gen.writeEndObject();
-               }
-
-               gen.writeEndArray();
-               gen.writeEndObject();
-
-               gen.close();
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
deleted file mode 100644
index 3affd7c..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler that returns a job's snapshotting settings.
- */
-public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandler {
-
-       private static final String CHECKPOINT_CONFIG_REST_PATH = 
"/jobs/:jobid/checkpoints/config";
-
-       public CheckpointConfigHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{CHECKPOINT_CONFIG_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return 
createCheckpointConfigJson(graph);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create checkpoint config json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the CheckpointConfigHandler.
-        */
-       public static class CheckpointConfigJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String json = createCheckpointConfigJson(graph);
-                       String path = CHECKPOINT_CONFIG_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       return Collections.singletonList(new ArchivedJson(path, 
json));
-               }
-       }
-
-       private static String createCheckpointConfigJson(AccessExecutionGraph 
graph) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-               JobCheckpointingSettings settings = 
graph.getJobCheckpointingSettings();
-
-               if (settings == null) {
-                       return "{}";
-               }
-
-               gen.writeStartObject();
-               {
-                       gen.writeStringField("mode", settings.isExactlyOnce() ? 
"exactly_once" : "at_least_once");
-                       gen.writeNumberField("interval", 
settings.getCheckpointInterval());
-                       gen.writeNumberField("timeout", 
settings.getCheckpointTimeout());
-                       gen.writeNumberField("min_pause", 
settings.getMinPauseBetweenCheckpoints());
-                       gen.writeNumberField("max_concurrent", 
settings.getMaxConcurrentCheckpoints());
-
-                       ExternalizedCheckpointSettings externalization = 
settings.getExternalizedCheckpointSettings();
-                       gen.writeObjectFieldStart("externalization");
-                       {
-                               if (externalization.externalizeCheckpoints()) {
-                                       gen.writeBooleanField("enabled", true);
-                                       
gen.writeBooleanField("delete_on_cancellation", 
externalization.deleteOnCancellation());
-                               } else {
-                                       gen.writeBooleanField("enabled", false);
-                               }
-                       }
-                       gen.writeEndObject();
-
-               }
-               gen.writeEndObject();
-
-               gen.close();
-
-               return writer.toString();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
deleted file mode 100644
index 974364d..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCache.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-
-import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
-import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
-
-import javax.annotation.Nullable;
-
-/**
- * A size-based cache of accessed checkpoints for completed and failed
- * checkpoints.
- *
- * <p>Having this cache in place for accessed stats improves the user
- * experience quite a bit as accessed checkpoint stats stay available
- * and don't expire. For example if you manage to click on the last
- * checkpoint in the history, it is not available via the stats as soon
- * as another checkpoint is triggered. With the cache in place, the
- * checkpoint will still be available for investigation.
- */
-public class CheckpointStatsCache {
-
-       @Nullable
-       private final Cache<Long, AbstractCheckpointStats> cache;
-
-       public CheckpointStatsCache(int maxNumEntries) {
-               if (maxNumEntries > 0) {
-                       this.cache = CacheBuilder.<Long, 
AbstractCheckpointStats>newBuilder()
-                               .maximumSize(maxNumEntries)
-                               .build();
-               } else {
-                       this.cache = null;
-               }
-       }
-
-       /**
-        * Try to add the checkpoint to the cache.
-        *
-        * @param checkpoint Checkpoint to be added.
-        */
-       void tryAdd(AbstractCheckpointStats checkpoint) {
-               // Don't add in progress checkpoints as they will be replaced 
by their
-               // completed/failed version eventually.
-               if (cache != null && checkpoint != null && 
!checkpoint.getStatus().isInProgress()) {
-                       cache.put(checkpoint.getCheckpointId(), checkpoint);
-               }
-       }
-
-       /**
-        * Try to look up a checkpoint by it's ID in the cache.
-        *
-        * @param checkpointId ID of the checkpoint to look up.
-        * @return The checkpoint or <code>null</code> if checkpoint not found.
-        */
-       AbstractCheckpointStats tryGet(long checkpointId) {
-               if (cache != null) {
-                       return cache.getIfPresent(checkpointId);
-               } else {
-                       return null;
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
deleted file mode 100644
index 96cc3e0..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns checkpoint stats for a single job vertex.
- */
-public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
-
-       private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = 
"/jobs/:jobid/checkpoints/details/:checkpointid";
-
-       private final CheckpointStatsCache cache;
-
-       public CheckpointStatsDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
-               super(executionGraphHolder, executor);
-               this.cache = cache;
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               long checkpointId = parseCheckpointId(params);
-                               if (checkpointId == -1) {
-                                       return "{}";
-                               }
-
-                               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
-                               if (snapshot == null) {
-                                       return "{}";
-                               }
-
-                               AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
-
-                               if (checkpoint != null) {
-                                       cache.tryAdd(checkpoint);
-                               } else {
-                                       checkpoint = cache.tryGet(checkpointId);
-
-                                       if (checkpoint == null) {
-                                               return "{}";
-                                       }
-                               }
-
-                               try {
-                                       return 
createCheckpointDetailsJson(checkpoint);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create checkpoint details json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the CheckpointStatsDetails.
-        */
-       public static class CheckpointStatsDetailsJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
-                       if (stats == null) {
-                               return Collections.emptyList();
-                       }
-                       CheckpointStatsHistory history = stats.getHistory();
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
-                               String json = 
createCheckpointDetailsJson(checkpoint);
-                               String path = CHECKPOINT_STATS_DETAILS_REST_PATH
-                                       .replace(":jobid", 
graph.getJobID().toString())
-                                       .replace(":checkpointid", 
String.valueOf(checkpoint.getCheckpointId()));
-                               archive.add(new ArchivedJson(path, json));
-                       }
-                       return archive;
-               }
-       }
-
-       public static String 
createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws 
IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-               gen.writeStartObject();
-
-               gen.writeNumberField("id", checkpoint.getCheckpointId());
-               gen.writeStringField("status", 
checkpoint.getStatus().toString());
-               gen.writeBooleanField("is_savepoint", 
checkpoint.getProperties().isSavepoint());
-               gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
-               gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
-               gen.writeNumberField("state_size", checkpoint.getStateSize());
-               gen.writeNumberField("end_to_end_duration", 
checkpoint.getEndToEndDuration());
-               gen.writeNumberField("alignment_buffered", 
checkpoint.getAlignmentBuffered());
-               gen.writeNumberField("num_subtasks", 
checkpoint.getNumberOfSubtasks());
-               gen.writeNumberField("num_acknowledged_subtasks", 
checkpoint.getNumberOfAcknowledgedSubtasks());
-
-               if (checkpoint.getStatus().isCompleted()) {
-                       // --- Completed ---
-                       CompletedCheckpointStats completed = 
(CompletedCheckpointStats) checkpoint;
-
-                       String externalPath = completed.getExternalPath();
-                       if (externalPath != null) {
-                               gen.writeStringField("external_path", 
externalPath);
-                       }
-
-                       gen.writeBooleanField("discarded", 
completed.isDiscarded());
-               }
-               else if (checkpoint.getStatus().isFailed()) {
-                       // --- Failed ---
-                       FailedCheckpointStats failed = (FailedCheckpointStats) 
checkpoint;
-
-                       gen.writeNumberField("failure_timestamp", 
failed.getFailureTimestamp());
-
-                       String failureMsg = failed.getFailureMessage();
-                       if (failureMsg != null) {
-                               gen.writeStringField("failure_message", 
failureMsg);
-                       }
-               }
-
-               gen.writeObjectFieldStart("tasks");
-               for (TaskStateStats taskStats : 
checkpoint.getAllTaskStateStats()) {
-                       
gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
-
-                       gen.writeNumberField("latest_ack_timestamp", 
taskStats.getLatestAckTimestamp());
-                       gen.writeNumberField("state_size", 
taskStats.getStateSize());
-                       gen.writeNumberField("end_to_end_duration", 
taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
-                       gen.writeNumberField("alignment_buffered", 
taskStats.getAlignmentBuffered());
-                       gen.writeNumberField("num_subtasks", 
taskStats.getNumberOfSubtasks());
-                       gen.writeNumberField("num_acknowledged_subtasks", 
taskStats.getNumberOfAcknowledgedSubtasks());
-
-                       gen.writeEndObject();
-               }
-               gen.writeEndObject();
-
-               gen.writeEndObject();
-               gen.close();
-
-               return writer.toString();
-       }
-
-       /**
-        * Returns the checkpoint ID parsed from the provided parameters.
-        *
-        * @param params Path parameters
-        * @return Parsed checkpoint ID or <code>-1</code> if not available.
-        */
-       static long parseCheckpointId(Map<String, String> params) {
-               String param = params.get("checkpointid");
-               if (param == null) {
-                       return -1;
-               }
-
-               try {
-                       return Long.parseLong(param);
-               } catch (NumberFormatException ignored) {
-                       return -1;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
deleted file mode 100644
index 045248b..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ /dev/null
@@ -1,234 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import 
org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-import static 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * Request handler that returns checkpoint stats for a single job vertex with
- * the summary stats and all subtasks.
- */
-public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGraphRequestHandler {
-
-       private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH 
= "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
-
-       private final CheckpointStatsCache cache;
-
-       public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
-               super(executionGraphHolder, executor);
-               this.cache = checkNotNull(cache);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new 
String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(
-                       Map<String, String> pathParams,
-                       Map<String, String> queryParams,
-                       JobManagerGateway jobManagerGateway) {
-               return super.handleJsonRequest(pathParams, queryParams, 
jobManagerGateway);
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               long checkpointId = 
CheckpointStatsDetailsHandler.parseCheckpointId(params);
-               if (checkpointId == -1) {
-                       return CompletableFuture.completedFuture("{}");
-               }
-
-               JobVertexID vertexId = 
AbstractJobVertexRequestHandler.parseJobVertexId(params);
-               if (vertexId == null) {
-                       return CompletableFuture.completedFuture("{}");
-               }
-
-               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
-               if (snapshot == null) {
-                       return CompletableFuture.completedFuture("{}");
-               }
-
-               AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
-
-               if (checkpoint != null) {
-                       cache.tryAdd(checkpoint);
-               } else {
-                       checkpoint = cache.tryGet(checkpointId);
-
-                       if (checkpoint == null) {
-                               return CompletableFuture.completedFuture("{}");
-                       }
-               }
-
-               TaskStateStats taskStats = 
checkpoint.getTaskStateStats(vertexId);
-               if (taskStats == null) {
-                       return CompletableFuture.completedFuture("{}");
-               }
-
-               try {
-                       return 
CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint,
 taskStats));
-               } catch (IOException e) {
-                       return FutureUtils.completedExceptionally(e);
-               }
-       }
-
-       /**
-        * Archivist for the CheckpointStatsDetailsSubtasksHandler.
-        */
-       public static class CheckpointStatsDetailsSubtasksJsonArchivist 
implements JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
-                       if (stats == null) {
-                               return Collections.emptyList();
-                       }
-                       CheckpointStatsHistory history = stats.getHistory();
-                       List<ArchivedJson> archive = new ArrayList<>();
-                       for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
-                               for (TaskStateStats subtaskStats : 
checkpoint.getAllTaskStateStats()) {
-                                       String json = 
createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
-                                       String path = 
CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
-                                               .replace(":jobid", 
graph.getJobID().toString())
-                                               .replace(":checkpointid", 
String.valueOf(checkpoint.getCheckpointId()))
-                                               .replace(":vertexid", 
subtaskStats.getJobVertexId().toString());
-                                       archive.add(new ArchivedJson(path, 
json));
-                               }
-                       }
-                       return archive;
-               }
-       }
-
-       private static String 
createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, 
TaskStateStats taskStats) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               gen.writeStartObject();
-               // Overview
-               gen.writeNumberField("id", checkpoint.getCheckpointId());
-               gen.writeStringField("status", 
checkpoint.getStatus().toString());
-               gen.writeNumberField("latest_ack_timestamp", 
taskStats.getLatestAckTimestamp());
-               gen.writeNumberField("state_size", taskStats.getStateSize());
-               gen.writeNumberField("end_to_end_duration", 
taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
-               gen.writeNumberField("alignment_buffered", 
taskStats.getAlignmentBuffered());
-               gen.writeNumberField("num_subtasks", 
taskStats.getNumberOfSubtasks());
-               gen.writeNumberField("num_acknowledged_subtasks", 
taskStats.getNumberOfAcknowledgedSubtasks());
-
-               if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
-                       gen.writeObjectFieldStart("summary");
-                       gen.writeObjectFieldStart("state_size");
-                       writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getStateSizeStats());
-                       gen.writeEndObject();
-
-                       gen.writeObjectFieldStart("end_to_end_duration");
-                       MinMaxAvgStats ackTimestampStats = 
taskStats.getSummaryStats().getAckTimestampStats();
-                       gen.writeNumberField("min", Math.max(0, 
ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
-                       gen.writeNumberField("max", Math.max(0, 
ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
-                       gen.writeNumberField("avg", Math.max(0, 
ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
-                       gen.writeEndObject();
-
-                       gen.writeObjectFieldStart("checkpoint_duration");
-                       gen.writeObjectFieldStart("sync");
-                       writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getSyncCheckpointDurationStats());
-                       gen.writeEndObject();
-                       gen.writeObjectFieldStart("async");
-                       writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
-                       gen.writeEndObject();
-                       gen.writeEndObject();
-
-                       gen.writeObjectFieldStart("alignment");
-                       gen.writeObjectFieldStart("buffered");
-                       writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getAlignmentBufferedStats());
-                       gen.writeEndObject();
-                       gen.writeObjectFieldStart("duration");
-                       writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getAlignmentDurationStats());
-                       gen.writeEndObject();
-                       gen.writeEndObject();
-                       gen.writeEndObject();
-               }
-
-               SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
-
-               gen.writeArrayFieldStart("subtasks");
-               for (int i = 0; i < subtasks.length; i++) {
-                       SubtaskStateStats subtask = subtasks[i];
-
-                       gen.writeStartObject();
-                       gen.writeNumberField("index", i);
-
-                       if (subtask != null) {
-                               gen.writeStringField("status", "completed");
-                               gen.writeNumberField("ack_timestamp", 
subtask.getAckTimestamp());
-                               gen.writeNumberField("end_to_end_duration", 
subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
-                               gen.writeNumberField("state_size", 
subtask.getStateSize());
-
-                               gen.writeObjectFieldStart("checkpoint");
-                               gen.writeNumberField("sync", 
subtask.getSyncCheckpointDuration());
-                               gen.writeNumberField("async", 
subtask.getAsyncCheckpointDuration());
-                               gen.writeEndObject();
-
-                               gen.writeObjectFieldStart("alignment");
-                               gen.writeNumberField("buffered", 
subtask.getAlignmentBuffered());
-                               gen.writeNumberField("duration", 
subtask.getAlignmentDuration());
-                               gen.writeEndObject();
-                       } else {
-                               gen.writeStringField("status", 
"pending_or_failed");
-                       }
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-
-               gen.writeEndObject();
-               gen.close();
-
-               return writer.toString();
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
deleted file mode 100644
index a60aee0..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import javax.annotation.Nullable;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Handler that returns checkpoint statistics for a job.
- */
-public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler {
-
-       private static final String CHECKPOINT_STATS_REST_PATH = 
"/jobs/:jobid/checkpoints";
-
-       public CheckpointStatsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
-               super(executionGraphHolder, executor);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{CHECKPOINT_STATS_REST_PATH};
-       }
-
-       @Override
-       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               try {
-                                       return createCheckpointStatsJson(graph);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not create checkpoint stats json.", e);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Archivist for the CheckpointStatsJsonHandler.
-        */
-       public static class CheckpointStatsJsonArchivist implements 
JsonArchivist {
-
-               @Override
-               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
-                       String json = createCheckpointStatsJson(graph);
-                       String path = CHECKPOINT_STATS_REST_PATH
-                               .replace(":jobid", graph.getJobID().toString());
-                       return Collections.singletonList(new ArchivedJson(path, 
json));
-               }
-       }
-
-       private static String createCheckpointStatsJson(AccessExecutionGraph 
graph) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
-               if (snapshot == null) {
-                       return "{}";
-               }
-
-               gen.writeStartObject();
-
-               // Counts
-               writeCounts(gen, snapshot.getCounts());
-
-               // Summary
-               writeSummary(gen, snapshot.getSummaryStats());
-
-               CheckpointStatsHistory history = snapshot.getHistory();
-
-               // Latest
-               writeLatestCheckpoints(
-                       gen,
-                       history.getLatestCompletedCheckpoint(),
-                       history.getLatestSavepoint(),
-                       history.getLatestFailedCheckpoint(),
-                       snapshot.getLatestRestoredCheckpoint());
-
-               // History
-               writeHistory(gen, snapshot.getHistory());
-
-               gen.writeEndObject();
-               gen.close();
-
-               return writer.toString();
-       }
-
-       private static void writeCounts(JsonGenerator gen, 
CheckpointStatsCounts counts) throws IOException {
-               gen.writeObjectFieldStart("counts");
-               gen.writeNumberField("restored", 
counts.getNumberOfRestoredCheckpoints());
-               gen.writeNumberField("total", 
counts.getTotalNumberOfCheckpoints());
-               gen.writeNumberField("in_progress", 
counts.getNumberOfInProgressCheckpoints());
-               gen.writeNumberField("completed", 
counts.getNumberOfCompletedCheckpoints());
-               gen.writeNumberField("failed", 
counts.getNumberOfFailedCheckpoints());
-               gen.writeEndObject();
-       }
-
-       private static void writeSummary(
-               JsonGenerator gen,
-               CompletedCheckpointStatsSummary summary) throws IOException {
-               gen.writeObjectFieldStart("summary");
-               gen.writeObjectFieldStart("state_size");
-               writeMinMaxAvg(gen, summary.getStateSizeStats());
-               gen.writeEndObject();
-
-               gen.writeObjectFieldStart("end_to_end_duration");
-               writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
-               gen.writeEndObject();
-
-               gen.writeObjectFieldStart("alignment_buffered");
-               writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
-               gen.writeEndObject();
-               gen.writeEndObject();
-       }
-
-       static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) 
throws IOException {
-               gen.writeNumberField("min", minMaxAvg.getMinimum());
-               gen.writeNumberField("max", minMaxAvg.getMaximum());
-               gen.writeNumberField("avg", minMaxAvg.getAverage());
-       }
-
-       private static void writeLatestCheckpoints(
-               JsonGenerator gen,
-               @Nullable CompletedCheckpointStats completed,
-               @Nullable CompletedCheckpointStats savepoint,
-               @Nullable FailedCheckpointStats failed,
-               @Nullable RestoredCheckpointStats restored) throws IOException {
-
-               gen.writeObjectFieldStart("latest");
-               // Completed checkpoint
-               if (completed != null) {
-                       gen.writeObjectFieldStart("completed");
-                       writeCheckpoint(gen, completed);
-
-                       String externalPath = completed.getExternalPath();
-                       if (externalPath != null) {
-                               gen.writeStringField("external_path", 
completed.getExternalPath());
-                       }
-
-                       gen.writeEndObject();
-               }
-
-               // Completed savepoint
-               if (savepoint != null) {
-                       gen.writeObjectFieldStart("savepoint");
-                       writeCheckpoint(gen, savepoint);
-
-                       String externalPath = savepoint.getExternalPath();
-                       if (externalPath != null) {
-                               gen.writeStringField("external_path", 
savepoint.getExternalPath());
-                       }
-                       gen.writeEndObject();
-               }
-
-               // Failed checkpoint
-               if (failed != null) {
-                       gen.writeObjectFieldStart("failed");
-                       writeCheckpoint(gen, failed);
-
-                       gen.writeNumberField("failure_timestamp", 
failed.getFailureTimestamp());
-                       String failureMsg = failed.getFailureMessage();
-                       if (failureMsg != null) {
-                               gen.writeStringField("failure_message", 
failureMsg);
-                       }
-                       gen.writeEndObject();
-               }
-
-               // Restored checkpoint
-               if (restored != null) {
-                       gen.writeObjectFieldStart("restored");
-                       gen.writeNumberField("id", restored.getCheckpointId());
-                       gen.writeNumberField("restore_timestamp", 
restored.getRestoreTimestamp());
-                       gen.writeBooleanField("is_savepoint", 
restored.getProperties().isSavepoint());
-
-                       String externalPath = restored.getExternalPath();
-                       if (externalPath != null) {
-                               gen.writeStringField("external_path", 
externalPath);
-                       }
-                       gen.writeEndObject();
-               }
-               gen.writeEndObject();
-       }
-
-       private static void writeCheckpoint(JsonGenerator gen, 
AbstractCheckpointStats checkpoint) throws IOException {
-               gen.writeNumberField("id", checkpoint.getCheckpointId());
-               gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
-               gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
-               gen.writeNumberField("state_size", checkpoint.getStateSize());
-               gen.writeNumberField("end_to_end_duration", 
checkpoint.getEndToEndDuration());
-               gen.writeNumberField("alignment_buffered", 
checkpoint.getAlignmentBuffered());
-
-       }
-
-       private static void writeHistory(JsonGenerator gen, 
CheckpointStatsHistory history) throws IOException {
-               gen.writeArrayFieldStart("history");
-               for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
-                       gen.writeStartObject();
-                       gen.writeNumberField("id", 
checkpoint.getCheckpointId());
-                       gen.writeStringField("status", 
checkpoint.getStatus().toString());
-                       gen.writeBooleanField("is_savepoint", 
checkpoint.getProperties().isSavepoint());
-                       gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
-                       gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
-                       gen.writeNumberField("state_size", 
checkpoint.getStateSize());
-                       gen.writeNumberField("end_to_end_duration", 
checkpoint.getEndToEndDuration());
-                       gen.writeNumberField("alignment_buffered", 
checkpoint.getAlignmentBuffered());
-                       gen.writeNumberField("num_subtasks", 
checkpoint.getNumberOfSubtasks());
-                       gen.writeNumberField("num_acknowledged_subtasks", 
checkpoint.getNumberOfAcknowledgedSubtasks());
-
-                       if (checkpoint.getStatus().isCompleted()) {
-                               // --- Completed ---
-                               CompletedCheckpointStats completed = 
(CompletedCheckpointStats) checkpoint;
-
-                               String externalPath = 
completed.getExternalPath();
-                               if (externalPath != null) {
-                                       gen.writeStringField("external_path", 
externalPath);
-                               }
-
-                               gen.writeBooleanField("discarded", 
completed.isDiscarded());
-                       }
-                       else if (checkpoint.getStatus().isFailed()) {
-                               // --- Failed ---
-                               FailedCheckpointStats failed = 
(FailedCheckpointStats) checkpoint;
-
-                               gen.writeNumberField("failure_timestamp", 
failed.getFailureTimestamp());
-
-                               String failureMsg = failed.getFailureMessage();
-                               if (failureMsg != null) {
-                                       gen.writeStringField("failure_message", 
failureMsg);
-                               }
-                       }
-
-                       gen.writeEndObject();
-               }
-               gen.writeEndArray();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
index 6da8115..01228d5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServer.java
@@ -27,9 +27,9 @@ import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
 import org.apache.flink.runtime.net.SSLUtils;
+import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
 import org.apache.flink.runtime.security.SecurityUtils;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
-import org.apache.flink.runtime.webmonitor.handlers.DashboardConfigHandler;
 import org.apache.flink.runtime.webmonitor.utils.WebFrontendBootstrap;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.FlinkException;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
index 0fc4314..bae8e21 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerArchiveFetcher.java
@@ -24,8 +24,8 @@ import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.history.FsJobArchivist;
+import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
-import org.apache.flink.runtime.webmonitor.handlers.CurrentJobsOverviewHandler;
 import org.apache.flink.util.FileUtils;
 
 import com.fasterxml.jackson.core.JsonFactory;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
index c5943dc..12a27a7 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/history/HistoryServerStaticFileServerHandler.java
@@ -26,7 +26,7 @@ package org.apache.flink.runtime.webmonitor.history;
  * 
https://github.com/netty/netty/blob/4.0/example/src/main/java/io/netty/example/http/file/HttpStaticFileServerHandler.java
  *****************************************************************************/
 
-import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.files.StaticFileServerHandler;
 
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFutureListener;

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
deleted file mode 100644
index cf286ce..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/AbstractMetricsHandler.java
+++ /dev/null
@@ -1,139 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.concurrent.FlinkFutureException;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.webmonitor.handlers.AbstractJsonRequestHandler;
-import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
-import org.apache.flink.util.Preconditions;
-
-import com.fasterxml.jackson.core.JsonGenerator;
-
-import java.io.IOException;
-import java.io.StringWriter;
-import java.util.Map;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-
-/**
- * Abstract request handler that returns a list of all available metrics or 
the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
- * {@code [ { "id" : "X" } ] }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested 
metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public abstract class AbstractMetricsHandler extends 
AbstractJsonRequestHandler {
-       private final MetricFetcher fetcher;
-
-       public AbstractMetricsHandler(Executor executor, MetricFetcher fetcher) 
{
-               super(executor);
-               this.fetcher = Preconditions.checkNotNull(fetcher);
-       }
-
-       @Override
-       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
-               return CompletableFuture.supplyAsync(
-                       () -> {
-                               fetcher.update();
-                               String requestedMetricsList = 
queryParams.get("get");
-                               try {
-                                       return requestedMetricsList != null
-                                               ? getMetricsValues(pathParams, 
requestedMetricsList)
-                                               : 
getAvailableMetricsList(pathParams);
-                               } catch (IOException e) {
-                                       throw new FlinkFutureException("Could 
not retrieve metrics.", e);
-                               }
-                       },
-                       executor);
-
-       }
-
-       /**
-        * Returns a Map containing the metrics belonging to the entity pointed 
to by the path parameters.
-        *
-        * @param pathParams REST path parameters
-        * @param metrics MetricStore containing all metrics
-        * @return Map containing metrics, or null if no metric exists
-        */
-       protected abstract Map<String, String> getMapFor(Map<String, String> 
pathParams, MetricStore metrics);
-
-       private String getMetricsValues(Map<String, String> pathParams, String 
requestedMetricsList) throws IOException {
-               if (requestedMetricsList.isEmpty()) {
-                       /*
-                        * The WebInterface doesn't check whether the list of 
available metrics was empty. This can lead to a
-                        * request for which the "get" parameter is an empty 
string.
-                        */
-                       return "";
-               }
-               MetricStore metricStore = fetcher.getMetricStore();
-               synchronized (metricStore) {
-                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
-                       if (metrics == null) {
-                               return "";
-                       }
-                       String[] requestedMetrics = 
requestedMetricsList.split(",");
-
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                       gen.writeStartArray();
-                       for (String requestedMetric : requestedMetrics) {
-                               Object metricValue = 
metrics.get(requestedMetric);
-                               if (metricValue != null) {
-                                       gen.writeStartObject();
-                                       gen.writeStringField("id", 
requestedMetric);
-                                       gen.writeStringField("value", 
metricValue.toString());
-                                       gen.writeEndObject();
-                               }
-                       }
-                       gen.writeEndArray();
-
-                       gen.close();
-                       return writer.toString();
-               }
-       }
-
-       private String getAvailableMetricsList(Map<String, String> pathParams) 
throws IOException {
-               MetricStore metricStore = fetcher.getMetricStore();
-               synchronized (metricStore) {
-                       Map<String, String> metrics = getMapFor(pathParams, 
metricStore);
-                       if (metrics == null) {
-                               return "";
-                       }
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
-
-                       gen.writeStartArray();
-                       for (String m : metrics.keySet()) {
-                               gen.writeStartObject();
-                               gen.writeStringField("id", m);
-                               gen.writeEndObject();
-                       }
-                       gen.writeEndArray();
-
-                       gen.close();
-                       return writer.toString();
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
deleted file mode 100644
index 2bd6683..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobManagerMetricsHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for the job manager a list of all available 
metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested 
metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class JobManagerMetricsHandler extends AbstractMetricsHandler {
-
-       private static final String JOBMANAGER_METRICS_REST_PATH = 
"/jobmanager/metrics";
-
-       public JobManagerMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
-               super(executor, fetcher);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOBMANAGER_METRICS_REST_PATH};
-       }
-
-       @Override
-       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobManagerMetricStore jobManager = 
metrics.getJobManagerMetricStore();
-               if (jobManager == null) {
-                       return null;
-               } else {
-                       return jobManager.metrics;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
deleted file mode 100644
index e5e2500..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobMetricsHandler.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for a given job a list of all available 
metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested 
metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class JobMetricsHandler extends AbstractMetricsHandler {
-       public static final String PARAMETER_JOB_ID = "jobid";
-       private static final String JOB_METRICS_REST_PATH = 
"/jobs/:jobid/metrics";
-
-       public JobMetricsHandler(Executor executor, MetricFetcher fetcher) {
-               super(executor, fetcher);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_METRICS_REST_PATH};
-       }
-
-       @Override
-       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.JobMetricStore job = 
metrics.getJobMetricStore(pathParams.get(PARAMETER_JOB_ID));
-               return job != null
-                       ? job.metrics
-                       : null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
deleted file mode 100644
index 1d2cd84..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/JobVertexMetricsHandler.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import java.util.Map;
-import java.util.concurrent.Executor;
-
-/**
- * Request handler that returns for a given task a list of all available 
metrics or the values for a set of metrics.
- *
- * <p>If the query parameters do not contain a "get" parameter the list of all 
metrics is returned.
- * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
- *
- * <p>If the query parameters do contain a "get" parameter a comma-separate 
list of metric names is expected as a value.
- * {@code /get?X,Y}
- * The handler will then return a list containing the values of the requested 
metrics.
- * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] }
- */
-public class JobVertexMetricsHandler extends AbstractMetricsHandler {
-       public static final String PARAMETER_VERTEX_ID = "vertexid";
-       private static final String JOB_VERTEX_METRICS_REST_PATH = 
"/jobs/:jobid/vertices/:vertexid/metrics";
-
-       public JobVertexMetricsHandler(Executor executor, MetricFetcher 
fetcher) {
-               super(executor, fetcher);
-       }
-
-       @Override
-       public String[] getPaths() {
-               return new String[]{JOB_VERTEX_METRICS_REST_PATH};
-       }
-
-       @Override
-       protected Map<String, String> getMapFor(Map<String, String> pathParams, 
MetricStore metrics) {
-               MetricStore.TaskMetricStore task = metrics.getTaskMetricStore(
-                       pathParams.get(JobMetricsHandler.PARAMETER_JOB_ID),
-                       pathParams.get(PARAMETER_VERTEX_ID));
-               return task != null
-                       ? task.metrics
-                       : null;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
deleted file mode 100644
index a5f4ca5..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricFetcher.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.messages.webmonitor.JobDetails;
-import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.MetricDumpSerialization;
-import org.apache.flink.runtime.metrics.dump.MetricQueryService;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceGateway;
-import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
-import org.apache.flink.util.Preconditions;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Executor;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.runtime.metrics.dump.MetricDumpSerialization.MetricDumpDeserializer;
-
-/**
- * The MetricFetcher can be used to fetch metrics from the JobManager and all 
registered TaskManagers.
- *
- * <p>Metrics will only be fetched when {@link MetricFetcher#update()} is 
called, provided that a sufficient time since
- * the last call has passed.
- */
-public class MetricFetcher {
-       private static final Logger LOG = 
LoggerFactory.getLogger(MetricFetcher.class);
-
-       private final GatewayRetriever<JobManagerGateway> retriever;
-       private final MetricQueryServiceRetriever queryServiceRetriever;
-       private final Executor executor;
-       private final Time timeout;
-
-       private final MetricStore metrics = new MetricStore();
-       private final MetricDumpDeserializer deserializer = new 
MetricDumpDeserializer();
-
-       private long lastUpdateTime;
-
-       public MetricFetcher(
-                       GatewayRetriever<JobManagerGateway> retriever,
-                       MetricQueryServiceRetriever queryServiceRetriever,
-                       Executor executor,
-                       Time timeout) {
-               this.retriever = Preconditions.checkNotNull(retriever);
-               this.queryServiceRetriever = 
Preconditions.checkNotNull(queryServiceRetriever);
-               this.executor = Preconditions.checkNotNull(executor);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       /**
-        * Returns the MetricStore containing all stored metrics.
-        *
-        * @return MetricStore containing all stored metrics;
-        */
-       public MetricStore getMetricStore() {
-               return metrics;
-       }
-
-       /**
-        * This method can be used to signal this MetricFetcher that the 
metrics are still in use and should be updated.
-        */
-       public void update() {
-               synchronized (this) {
-                       long currentTime = System.currentTimeMillis();
-                       if (currentTime - lastUpdateTime > 10000) { // 10 
seconds have passed since the last update
-                               lastUpdateTime = currentTime;
-                               fetchMetrics();
-                       }
-               }
-       }
-
-       private void fetchMetrics() {
-               try {
-                       Optional<JobManagerGateway> optJobManagerGateway = 
retriever.getNow();
-                       if (optJobManagerGateway.isPresent()) {
-                               final JobManagerGateway jobManagerGateway = 
optJobManagerGateway.get();
-
-                               /**
-                                * Remove all metrics that belong to a job that 
is not running and no longer archived.
-                                */
-                               CompletableFuture<MultipleJobsDetails> 
jobDetailsFuture = jobManagerGateway.requestJobDetails(true, true, timeout);
-
-                               jobDetailsFuture.whenCompleteAsync(
-                                       (MultipleJobsDetails jobDetails, 
Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       LOG.debug("Fetching of 
JobDetails failed.", throwable);
-                                               } else {
-                                                       ArrayList<String> 
toRetain = new ArrayList<>();
-                                                       for (JobDetails job : 
jobDetails.getRunningJobs()) {
-                                                               
toRetain.add(job.getJobId().toString());
-                                                       }
-                                                       for (JobDetails job : 
jobDetails.getFinishedJobs()) {
-                                                               
toRetain.add(job.getJobId().toString());
-                                                       }
-                                                       synchronized (metrics) {
-                                                               
metrics.jobs.keySet().retainAll(toRetain);
-                                                       }
-                                               }
-                                       },
-                                       executor);
-
-                               String jobManagerPath = 
jobManagerGateway.getAddress();
-                               String jmQueryServicePath = 
jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME;
-
-                               retrieveAndQueryMetrics(jmQueryServicePath);
-
-                               /**
-                                * We first request the list of all registered 
task managers from the job manager, and then
-                                * request the respective metric dump from each 
task manager.
-                                *
-                                * <p>All stored metrics that do not belong to 
a registered task manager will be removed.
-                                */
-                               CompletableFuture<Collection<Instance>> 
taskManagersFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
-
-                               taskManagersFuture.whenCompleteAsync(
-                                       (Collection<Instance> taskManagers, 
Throwable throwable) -> {
-                                               if (throwable != null) {
-                                                       LOG.debug("Fetching 
list of registered TaskManagers failed.", throwable);
-                                               } else {
-                                                       List<String> 
activeTaskManagers = taskManagers.stream().map(
-                                                               
taskManagerInstance -> {
-                                                                       final 
String taskManagerAddress = 
taskManagerInstance.getTaskManagerGateway().getAddress();
-                                                                       final 
String tmQueryServicePath = taskManagerAddress.substring(0, 
taskManagerAddress.lastIndexOf('/') + 1) + 
MetricQueryService.METRIC_QUERY_SERVICE_NAME + "_" + 
taskManagerInstance.getTaskManagerID().getResourceIdString();
-
-                                                                       
retrieveAndQueryMetrics(tmQueryServicePath);
-
-                                                                       return 
taskManagerInstance.getId().toString();
-                                                               
}).collect(Collectors.toList());
-
-                                                       synchronized (metrics) {
-                                                               
metrics.taskManagers.keySet().retainAll(activeTaskManagers);
-                                                       }
-                                               }
-                                       },
-                                       executor);
-                       }
-               } catch (Exception e) {
-                       LOG.warn("Exception while fetching metrics.", e);
-               }
-       }
-
-       /**
-        * Retrieves and queries the specified QueryServiceGateway.
-        *
-        * @param queryServicePath specifying the QueryServiceGateway
-        */
-       private void retrieveAndQueryMetrics(String queryServicePath) {
-               final CompletableFuture<MetricQueryServiceGateway> 
queryServiceGatewayFuture = 
queryServiceRetriever.retrieveService(queryServicePath);
-
-               queryServiceGatewayFuture.whenCompleteAsync(
-                       (MetricQueryServiceGateway queryServiceGateway, 
Throwable t) -> {
-                               if (t != null) {
-                                       LOG.debug("Could not retrieve 
QueryServiceGateway.", t);
-                               } else {
-                                       queryMetrics(queryServiceGateway);
-                               }
-                       },
-                       executor);
-       }
-
-       /**
-        * Query the metrics from the given QueryServiceGateway.
-        *
-        * @param queryServiceGateway to query for metrics
-        */
-       private void queryMetrics(final MetricQueryServiceGateway 
queryServiceGateway) {
-               queryServiceGateway
-                       .queryMetrics(timeout)
-                       .whenCompleteAsync(
-                               
(MetricDumpSerialization.MetricSerializationResult result, Throwable t) -> {
-                                       if (t != null) {
-                                               LOG.debug("Fetching metrics 
failed.", t);
-                                       } else {
-                                               List<MetricDump> dumpedMetrics 
= deserializer.deserialize(result);
-                                               synchronized (metrics) {
-                                                       for (MetricDump metric 
: dumpedMetrics) {
-                                                               
metrics.add(metric);
-                                                       }
-                                               }
-                                       }
-                               },
-                               executor);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
deleted file mode 100644
index e36dca8..0000000
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java
+++ /dev/null
@@ -1,305 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.metrics;
-
-import org.apache.flink.runtime.metrics.dump.MetricDump;
-import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_COUNTER;
-import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_GAUGE;
-import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_HISTOGRAM;
-import static 
org.apache.flink.runtime.metrics.dump.MetricDump.METRIC_CATEGORY_METER;
-import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM;
-import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB;
-import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR;
-import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK;
-import static 
org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM;
-
-/**
- * Nested data-structure to store metrics.
- *
- * <p>This structure is not thread-safe.
- */
-public class MetricStore {
-       private static final Logger LOG = 
LoggerFactory.getLogger(MetricStore.class);
-
-       final JobManagerMetricStore jobManager = new JobManagerMetricStore();
-       final Map<String, TaskManagerMetricStore> taskManagers = new 
HashMap<>();
-       final Map<String, JobMetricStore> jobs = new HashMap<>();
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       // Adding metrics
-       // 
-----------------------------------------------------------------------------------------------------------------
-       public void add(MetricDump metric) {
-               try {
-                       QueryScopeInfo info = metric.scopeInfo;
-                       TaskManagerMetricStore tm;
-                       JobMetricStore job;
-                       TaskMetricStore task;
-                       SubtaskMetricStore subtask;
-
-                       String name = info.scope.isEmpty()
-                               ? metric.name
-                               : info.scope + "." + metric.name;
-
-                       if (name.isEmpty()) { // malformed transmission
-                               return;
-                       }
-
-                       switch (info.getCategory()) {
-                               case INFO_CATEGORY_JM:
-                                       addMetric(jobManager.metrics, name, 
metric);
-                                       break;
-                               case INFO_CATEGORY_TM:
-                                       String tmID = 
((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID;
-                                       tm = taskManagers.get(tmID);
-                                       if (tm == null) {
-                                               tm = new 
TaskManagerMetricStore();
-                                               taskManagers.put(tmID, tm);
-                                       }
-                                       if (name.contains("GarbageCollector")) {
-                                               String gcName = 
name.substring("Status.JVM.GarbageCollector.".length(), name.lastIndexOf('.'));
-                                               
tm.addGarbageCollectorName(gcName);
-                                       }
-                                       addMetric(tm.metrics, name, metric);
-                                       break;
-                               case INFO_CATEGORY_JOB:
-                                       QueryScopeInfo.JobQueryScopeInfo 
jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info;
-                                       job = jobs.get(jobInfo.jobID);
-                                       if (job == null) {
-                                               job = new JobMetricStore();
-                                               jobs.put(jobInfo.jobID, job);
-                                       }
-                                       addMetric(job.metrics, name, metric);
-                                       break;
-                               case INFO_CATEGORY_TASK:
-                                       QueryScopeInfo.TaskQueryScopeInfo 
taskInfo = (QueryScopeInfo.TaskQueryScopeInfo) info;
-                                       job = jobs.get(taskInfo.jobID);
-                                       if (job == null) {
-                                               job = new JobMetricStore();
-                                               jobs.put(taskInfo.jobID, job);
-                                       }
-                                       task = job.tasks.get(taskInfo.vertexID);
-                                       if (task == null) {
-                                               task = new TaskMetricStore();
-                                               
job.tasks.put(taskInfo.vertexID, task);
-                                       }
-                                       subtask = 
task.subtasks.get(taskInfo.subtaskIndex);
-                                       if (subtask == null) {
-                                               subtask = new 
SubtaskMetricStore();
-                                               
task.subtasks.put(taskInfo.subtaskIndex, subtask);
-                                       }
-                                       /**
-                                        * The duplication is intended. Metrics 
scoped by subtask are useful for several job/task handlers,
-                                        * while the WebInterface task metric 
queries currently do not account for subtasks, so we don't
-                                        * divide by subtask and instead use 
the concatenation of subtask index and metric name as the name
-                                        * for those.
-                                        */
-                                       addMetric(subtask.metrics, name, 
metric);
-                                       addMetric(task.metrics, 
taskInfo.subtaskIndex + "." + name, metric);
-                                       break;
-                               case INFO_CATEGORY_OPERATOR:
-                                       QueryScopeInfo.OperatorQueryScopeInfo 
operatorInfo = (QueryScopeInfo.OperatorQueryScopeInfo) info;
-                                       job = jobs.get(operatorInfo.jobID);
-                                       if (job == null) {
-                                               job = new JobMetricStore();
-                                               jobs.put(operatorInfo.jobID, 
job);
-                                       }
-                                       task = 
job.tasks.get(operatorInfo.vertexID);
-                                       if (task == null) {
-                                               task = new TaskMetricStore();
-                                               
job.tasks.put(operatorInfo.vertexID, task);
-                                       }
-                                       /**
-                                        * As the WebInterface does not account 
for operators (because it can't) we don't
-                                        * divide by operator and instead use 
the concatenation of subtask index, operator name and metric name
-                                        * as the name.
-                                        */
-                                       addMetric(task.metrics, 
operatorInfo.subtaskIndex + "." + operatorInfo.operatorName + "." + name, 
metric);
-                                       break;
-                               default:
-                                       LOG.debug("Invalid metric dump 
category: " + info.getCategory());
-                       }
-               } catch (Exception e) {
-                       LOG.debug("Malformed metric dump.", e);
-               }
-       }
-
-       private void addMetric(Map<String, String> target, String name, 
MetricDump metric) {
-               switch (metric.getCategory()) {
-                       case METRIC_CATEGORY_COUNTER:
-                               MetricDump.CounterDump counter = 
(MetricDump.CounterDump) metric;
-                               target.put(name, String.valueOf(counter.count));
-                               break;
-                       case METRIC_CATEGORY_GAUGE:
-                               MetricDump.GaugeDump gauge = 
(MetricDump.GaugeDump) metric;
-                               target.put(name, gauge.value);
-                               break;
-                       case METRIC_CATEGORY_HISTOGRAM:
-                               MetricDump.HistogramDump histogram = 
(MetricDump.HistogramDump) metric;
-                               target.put(name + "_min", 
String.valueOf(histogram.min));
-                               target.put(name + "_max", 
String.valueOf(histogram.max));
-                               target.put(name + "_mean", 
String.valueOf(histogram.mean));
-                               target.put(name + "_median", 
String.valueOf(histogram.median));
-                               target.put(name + "_stddev", 
String.valueOf(histogram.stddev));
-                               target.put(name + "_p75", 
String.valueOf(histogram.p75));
-                               target.put(name + "_p90", 
String.valueOf(histogram.p90));
-                               target.put(name + "_p95", 
String.valueOf(histogram.p95));
-                               target.put(name + "_p98", 
String.valueOf(histogram.p98));
-                               target.put(name + "_p99", 
String.valueOf(histogram.p99));
-                               target.put(name + "_p999", 
String.valueOf(histogram.p999));
-                               break;
-                       case METRIC_CATEGORY_METER:
-                               MetricDump.MeterDump meter = 
(MetricDump.MeterDump) metric;
-                               target.put(name, String.valueOf(meter.rate));
-                               break;
-               }
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       // Accessors for sub MetricStores
-       // 
-----------------------------------------------------------------------------------------------------------------
-
-       /**
-        * Returns the {@link JobManagerMetricStore}.
-        *
-        * @return JobManagerMetricStore
-        */
-       public JobManagerMetricStore getJobManagerMetricStore() {
-               return jobManager;
-       }
-
-       /**
-        * Returns the {@link TaskManagerMetricStore} for the given taskmanager 
ID.
-        *
-        * @param tmID taskmanager ID
-        * @return TaskManagerMetricStore for the given ID, or null if no store 
for the given argument exists
-        */
-       public TaskManagerMetricStore getTaskManagerMetricStore(String tmID) {
-               return taskManagers.get(tmID);
-       }
-
-       /**
-        * Returns the {@link JobMetricStore} for the given job ID.
-        *
-        * @param jobID job ID
-        * @return JobMetricStore for the given ID, or null if no store for the 
given argument exists
-        */
-       public JobMetricStore getJobMetricStore(String jobID) {
-               return jobs.get(jobID);
-       }
-
-       /**
-        * Returns the {@link TaskMetricStore} for the given job/task ID.
-        *
-        * @param jobID  job ID
-        * @param taskID task ID
-        * @return TaskMetricStore for given IDs, or null if no store for the 
given arguments exists
-        */
-       public TaskMetricStore getTaskMetricStore(String jobID, String taskID) {
-               JobMetricStore job = getJobMetricStore(jobID);
-               if (job == null) {
-                       return null;
-               }
-               return job.getTaskMetricStore(taskID);
-       }
-
-       /**
-        * Returns the {@link SubtaskMetricStore} for the given job/task ID and 
subtask index.
-        *
-        * @param jobID        job ID
-        * @param taskID       task ID
-        * @param subtaskIndex subtask index
-        * @return SubtaskMetricStore for the given IDs and index, or null if 
no store for the given arguments exists
-        */
-       public SubtaskMetricStore getSubtaskMetricStore(String jobID, String 
taskID, int subtaskIndex) {
-               TaskMetricStore task = getTaskMetricStore(jobID, taskID);
-               if (task == null) {
-                       return null;
-               }
-               return task.getSubtaskMetricStore(subtaskIndex);
-       }
-
-       // 
-----------------------------------------------------------------------------------------------------------------
-       // sub MetricStore classes
-       // 
-----------------------------------------------------------------------------------------------------------------
-       private abstract static class ComponentMetricStore {
-               public final Map<String, String> metrics = new HashMap<>();
-
-               public String getMetric(String name, String defaultValue) {
-                       String value = this.metrics.get(name);
-                       return value != null
-                               ? value
-                               : defaultValue;
-               }
-       }
-
-       /**
-        * Sub-structure containing metrics of the JobManager.
-        */
-       public static class JobManagerMetricStore extends ComponentMetricStore {
-       }
-
-       /**
-        * Sub-structure containing metrics of a single TaskManager.
-        */
-       public static class TaskManagerMetricStore extends ComponentMetricStore 
{
-               public final Set<String> garbageCollectorNames = new 
HashSet<>();
-
-               public void addGarbageCollectorName(String name) {
-                       garbageCollectorNames.add(name);
-               }
-       }
-
-       /**
-        * Sub-structure containing metrics of a single Job.
-        */
-       public static class JobMetricStore extends ComponentMetricStore {
-               private final Map<String, TaskMetricStore> tasks = new 
HashMap<>();
-
-               public TaskMetricStore getTaskMetricStore(String taskID) {
-                       return tasks.get(taskID);
-               }
-       }
-
-       /**
-        * Sub-structure containing metrics of a single Task.
-        */
-       public static class TaskMetricStore extends ComponentMetricStore {
-               private final Map<Integer, SubtaskMetricStore> subtasks = new 
HashMap<>();
-
-               public SubtaskMetricStore getSubtaskMetricStore(int 
subtaskIndex) {
-                       return subtasks.get(subtaskIndex);
-               }
-       }
-
-       /**
-        * Sub-structure containing metrics of a single Subtask.
-        */
-       public static class SubtaskMetricStore extends ComponentMetricStore {
-       }
-}

Reply via email to