http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
new file mode 100644
index 0000000..95d417a
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandler.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.util.StringUtils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * A request handler that provides an overview over all taskmanagers or 
details for a single one.
+ */
+public class TaskManagersHandler extends AbstractJsonRequestHandler  {
+
+       private static final String TASKMANAGERS_REST_PATH = "/taskmanagers";
+       private static final String TASKMANAGER_DETAILS_REST_PATH = 
"/taskmanagers/:taskmanagerid";
+
+       public static final String TASK_MANAGER_ID_KEY = "taskmanagerid";
+
+       private final Time timeout;
+
+       private final MetricFetcher fetcher;
+
+       public TaskManagersHandler(Executor executor, Time timeout, 
MetricFetcher fetcher) {
+               super(executor);
+               this.timeout = requireNonNull(timeout);
+               this.fetcher = fetcher;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{TASKMANAGERS_REST_PATH, 
TASKMANAGER_DETAILS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(Map<String, String> 
pathParams, Map<String, String> queryParams, JobManagerGateway 
jobManagerGateway) {
+               if (jobManagerGateway != null) {
+                       // whether one task manager's metrics are requested, or 
all task manager, we
+                       // return them in an array. This avoids unnecessary 
code complexity.
+                       // If only one task manager is requested, we only fetch 
one task manager metrics.
+                       if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+                               InstanceID instanceID = new 
InstanceID(StringUtils.hexStringToByte(pathParams.get(TASK_MANAGER_ID_KEY)));
+                               CompletableFuture<Optional<Instance>> 
tmInstanceFuture = jobManagerGateway.requestTaskManagerInstance(instanceID, 
timeout);
+
+                               return tmInstanceFuture.thenApplyAsync(
+                                       (Optional<Instance> optTaskManager) -> {
+                                               try {
+                                                       return 
writeTaskManagersJson(
+                                                               
optTaskManager.map(Collections::singleton).orElse(Collections.emptySet()),
+                                                               pathParams);
+                                               } catch (IOException e) {
+                                                       throw new 
FlinkFutureException("Could not write TaskManagers JSON.", e);
+                                               }
+                                       },
+                                       executor);
+                       } else {
+                               CompletableFuture<Collection<Instance>> 
tmInstancesFuture = jobManagerGateway.requestTaskManagerInstances(timeout);
+
+                               return tmInstancesFuture.thenApplyAsync(
+                                       (Collection<Instance> taskManagers) -> {
+                                               try {
+                                                       return 
writeTaskManagersJson(taskManagers, pathParams);
+                                               } catch (IOException e) {
+                                                       throw new 
FlinkFutureException("Could not write TaskManagers JSON.", e);
+                                               }
+                                       },
+                                       executor);
+                       }
+               }
+               else {
+                       return FutureUtils.completedExceptionally(new 
Exception("No connection to the leading JobManager."));
+               }
+       }
+
+       private String writeTaskManagersJson(Collection<Instance> instances, 
Map<String, String> pathParams) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+               gen.writeArrayFieldStart("taskmanagers");
+
+               for (Instance instance : instances) {
+                       gen.writeStartObject();
+                       gen.writeStringField("id", instance.getId().toString());
+                       gen.writeStringField("path", 
instance.getTaskManagerGateway().getAddress());
+                       gen.writeNumberField("dataPort", 
instance.getTaskManagerLocation().dataPort());
+                       gen.writeNumberField("timeSinceLastHeartbeat", 
instance.getLastHeartBeat());
+                       gen.writeNumberField("slotsNumber", 
instance.getTotalNumberOfSlots());
+                       gen.writeNumberField("freeSlots", 
instance.getNumberOfAvailableSlots());
+                       gen.writeNumberField("cpuCores", 
instance.getResources().getNumberOfCPUCores());
+                       gen.writeNumberField("physicalMemory", 
instance.getResources().getSizeOfPhysicalMemory());
+                       gen.writeNumberField("freeMemory", 
instance.getResources().getSizeOfJvmHeap());
+                       gen.writeNumberField("managedMemory", 
instance.getResources().getSizeOfManagedMemory());
+
+                       // only send metrics when only one task manager 
requests them.
+                       if (pathParams.containsKey(TASK_MANAGER_ID_KEY)) {
+                               fetcher.update();
+                               MetricStore.TaskManagerMetricStore metrics = 
fetcher.getMetricStore().getTaskManagerMetricStore(instance.getId().toString());
+                               if (metrics != null) {
+                                       gen.writeObjectFieldStart("metrics");
+                                       long heapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Used", "0"));
+                                       long heapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Committed", "0"));
+                                       long heapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Heap.Max", "0"));
+
+                                       gen.writeNumberField("heapCommitted", 
heapCommitted);
+                                       gen.writeNumberField("heapUsed", 
heapUsed);
+                                       gen.writeNumberField("heapMax", 
heapTotal);
+
+                                       long nonHeapUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Used", "0"));
+                                       long nonHeapCommitted = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Committed", "0"));
+                                       long nonHeapTotal = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.NonHeap.Max", "0"));
+
+                                       
gen.writeNumberField("nonHeapCommitted", nonHeapCommitted);
+                                       gen.writeNumberField("nonHeapUsed", 
nonHeapUsed);
+                                       gen.writeNumberField("nonHeapMax", 
nonHeapTotal);
+
+                                       gen.writeNumberField("totalCommitted", 
heapCommitted + nonHeapCommitted);
+                                       gen.writeNumberField("totalUsed", 
heapUsed + nonHeapUsed);
+                                       gen.writeNumberField("totalMax", 
heapTotal + nonHeapTotal);
+
+                                       long directCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.Count", "0"));
+                                       long directUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.MemoryUsed", "0"));
+                                       long directMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Direct.TotalCapacity", "0"));
+
+                                       gen.writeNumberField("directCount", 
directCount);
+                                       gen.writeNumberField("directUsed", 
directUsed);
+                                       gen.writeNumberField("directMax", 
directMax);
+
+                                       long mappedCount = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.Count", "0"));
+                                       long mappedUsed = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.MemoryUsed", "0"));
+                                       long mappedMax = 
Long.valueOf(metrics.getMetric("Status.JVM.Memory.Mapped.TotalCapacity", "0"));
+
+                                       gen.writeNumberField("mappedCount", 
mappedCount);
+                                       gen.writeNumberField("mappedUsed", 
mappedUsed);
+                                       gen.writeNumberField("mappedMax", 
mappedMax);
+
+                                       long memorySegmentsAvailable = 
Long.valueOf(metrics.getMetric("Status.Network.AvailableMemorySegments", "0"));
+                                       long memorySegmentsTotal = 
Long.valueOf(metrics.getMetric("Status.Network.TotalMemorySegments", "0"));
+
+                                       
gen.writeNumberField("memorySegmentsAvailable", memorySegmentsAvailable);
+                                       
gen.writeNumberField("memorySegmentsTotal", memorySegmentsTotal);
+
+                                       
gen.writeArrayFieldStart("garbageCollectors");
+
+                                       for (String gcName : 
metrics.garbageCollectorNames) {
+                                               String count = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Count", null);
+                                               String time = 
metrics.getMetric("Status.JVM.GarbageCollector." + gcName + ".Time", null);
+                                               if (count != null  && time != 
null) {
+                                                       gen.writeStartObject();
+                                                       
gen.writeStringField("name", gcName);
+                                                       
gen.writeNumberField("count", Long.valueOf(count));
+                                                       
gen.writeNumberField("time", Long.valueOf(time));
+                                                       gen.writeEndObject();
+                                               }
+                                       }
+
+                                       gen.writeEndArray();
+                                       gen.writeEndObject();
+                               }
+                       }
+
+                       gen.writeEndObject();
+               }
+
+               gen.writeEndArray();
+               gen.writeEndObject();
+
+               gen.close();
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
new file mode 100644
index 0000000..96bf7ec
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTracker.java
@@ -0,0 +1,333 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics tracker.
+ *
+ * <p>Back pressure is determined by sampling running tasks. If a task is
+ * slowed down by back pressure it will be stuck in memory requests to a
+ * {@link org.apache.flink.runtime.io.network.buffer.LocalBufferPool}.
+ *
+ * <p>The back pressured stack traces look like this:
+ *
+ * <pre>
+ * java.lang.Object.wait(Native Method)
+ * o.a.f.[...].LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
+ * o.a.f.[...].LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) 
<--- BLOCKING
+ * request
+ * [...]
+ * </pre>
+ */
+public class BackPressureStatsTracker {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(BackPressureStatsTracker.class);
+
+       /** Maximum stack trace depth for samples. */
+       static final int MAX_STACK_TRACE_DEPTH = 3;
+
+       /** Expected class name for back pressure indicating stack trace 
element. */
+       static final String EXPECTED_CLASS_NAME = 
"org.apache.flink.runtime.io.network.buffer.LocalBufferPool";
+
+       /** Expected method name for back pressure indicating stack trace 
element. */
+       static final String EXPECTED_METHOD_NAME = "requestBufferBlocking";
+
+       /** Lock guarding trigger operations. */
+       private final Object lock = new Object();
+
+       /* Stack trace sample coordinator. */
+       private final StackTraceSampleCoordinator coordinator;
+
+       /**
+        * Completed stats. Important: Job vertex IDs need to be scoped by job 
ID,
+        * because they are potentially constant across runs messing up the 
cached
+        * data.
+        */
+       private final Cache<ExecutionJobVertex, OperatorBackPressureStats> 
operatorStatsCache;
+
+       /** Pending in progress stats. Important: Job vertex IDs need to be 
scoped
+        * by job ID, because they are potentially constant across runs messing 
up
+        * the cached data.*/
+       private final Set<ExecutionJobVertex> pendingStats = new HashSet<>();
+
+       /** Cleanup interval for completed stats cache. */
+       private final int cleanUpInterval;
+
+       private final int numSamples;
+
+       private final Time delayBetweenSamples;
+
+       /** Flag indicating whether the stats tracker has been shut down. */
+       private boolean shutDown;
+
+       /**
+        * Creates a back pressure statistics tracker.
+        *
+        * @param cleanUpInterval     Clean up interval for completed stats.
+        * @param numSamples          Number of stack trace samples when 
determining back pressure.
+        * @param delayBetweenSamples Delay between samples when determining 
back pressure.
+        */
+       public BackPressureStatsTracker(
+                       StackTraceSampleCoordinator coordinator,
+                       int cleanUpInterval,
+                       int numSamples,
+                       Time delayBetweenSamples) {
+
+               this.coordinator = checkNotNull(coordinator, "Stack trace 
sample coordinator");
+
+               checkArgument(cleanUpInterval >= 0, "Clean up interval");
+               this.cleanUpInterval = cleanUpInterval;
+
+               checkArgument(numSamples >= 1, "Number of samples");
+               this.numSamples = numSamples;
+
+               this.delayBetweenSamples = checkNotNull(delayBetweenSamples, 
"Delay between samples");
+
+               this.operatorStatsCache = CacheBuilder.newBuilder()
+                               .concurrencyLevel(1)
+                               .expireAfterAccess(cleanUpInterval, 
TimeUnit.MILLISECONDS)
+                               .build();
+       }
+
+       /** Cleanup interval for completed stats cache. */
+       public long getCleanUpInterval() {
+               return cleanUpInterval;
+       }
+
+       /**
+        * Returns back pressure statistics for a operator.
+        *
+        * @param vertex Operator to get the stats for.
+        *
+        * @return Back pressure statistics for an operator
+        */
+       public Optional<OperatorBackPressureStats> 
getOperatorBackPressureStats(ExecutionJobVertex vertex) {
+               return 
Optional.ofNullable(operatorStatsCache.getIfPresent(vertex));
+       }
+
+       /**
+        * Triggers a stack trace sample for a operator to gather the back 
pressure
+        * statistics. If there is a sample in progress for the operator, the 
call
+        * is ignored.
+        *
+        * @param vertex Operator to get the stats for.
+        * @return Flag indicating whether a sample with triggered.
+        */
+       @SuppressWarnings("unchecked")
+       public boolean triggerStackTraceSample(ExecutionJobVertex vertex) {
+               synchronized (lock) {
+                       if (shutDown) {
+                               return false;
+                       }
+
+                       if (!pendingStats.contains(vertex) &&
+                                       
!vertex.getGraph().getState().isGloballyTerminalState()) {
+
+                               Executor executor = 
vertex.getGraph().getFutureExecutor();
+
+                               // Only trigger if still active job
+                               if (executor != null) {
+                                       pendingStats.add(vertex);
+
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
+                                       }
+
+                                       CompletableFuture<StackTraceSample> 
sample = coordinator.triggerStackTraceSample(
+                                                       
vertex.getTaskVertices(),
+                                                       numSamples,
+                                                       delayBetweenSamples,
+                                                       MAX_STACK_TRACE_DEPTH);
+
+                                       sample.handleAsync(new 
StackTraceSampleCompletionCallback(vertex), executor);
+
+                                       return true;
+                               }
+                       }
+
+                       return false;
+               }
+       }
+
+       /**
+        * Cleans up the operator stats cache if it contains timed out entries.
+        *
+        * <p>The Guava cache only evicts as maintenance during normal 
operations.
+        * If this handler is inactive, it will never be cleaned.
+        */
+       public void cleanUpOperatorStatsCache() {
+               operatorStatsCache.cleanUp();
+       }
+
+       /**
+        * Shuts down the stats tracker.
+        *
+        * <p>Invalidates the cache and clears all pending stats.
+        */
+       public void shutDown() {
+               synchronized (lock) {
+                       if (!shutDown) {
+                               operatorStatsCache.invalidateAll();
+                               pendingStats.clear();
+
+                               shutDown = true;
+                       }
+               }
+       }
+
+       /**
+        * Invalidates the cache (irrespective of clean up interval).
+        */
+       void invalidateOperatorStatsCache() {
+               operatorStatsCache.invalidateAll();
+       }
+
+       /**
+        * Callback on completed stack trace sample.
+        */
+       class StackTraceSampleCompletionCallback implements 
BiFunction<StackTraceSample, Throwable, Void> {
+
+               private final ExecutionJobVertex vertex;
+
+               public StackTraceSampleCompletionCallback(ExecutionJobVertex 
vertex) {
+                       this.vertex = vertex;
+               }
+
+               @Override
+               public Void apply(StackTraceSample stackTraceSample, Throwable 
throwable) {
+                       synchronized (lock) {
+                               try {
+                                       if (shutDown) {
+                                               return null;
+                                       }
+
+                                       // Job finished, ignore.
+                                       JobStatus jobState = 
vertex.getGraph().getState();
+                                       if (jobState.isGloballyTerminalState()) 
{
+                                               LOG.debug("Ignoring sample, 
because job is in state " + jobState + ".");
+                                       } else if (stackTraceSample != null) {
+                                               OperatorBackPressureStats stats 
= createStatsFromSample(stackTraceSample);
+                                               operatorStatsCache.put(vertex, 
stats);
+                                       } else {
+                                               LOG.debug("Failed to gather 
stack trace sample.", throwable);
+                                       }
+                               } catch (Throwable t) {
+                                       LOG.error("Error during stats 
completion.", t);
+                               } finally {
+                                       pendingStats.remove(vertex);
+                               }
+
+                               return null;
+                       }
+               }
+
+               /**
+                * Creates the back pressure stats from a stack trace sample.
+                *
+                * @param sample Stack trace sample to base stats on.
+                *
+                * @return Back pressure stats
+                */
+               private OperatorBackPressureStats 
createStatsFromSample(StackTraceSample sample) {
+                       Map<ExecutionAttemptID, List<StackTraceElement[]>> 
traces = sample.getStackTraces();
+
+                       // Map task ID to subtask index, because the web 
interface expects
+                       // it like that.
+                       Map<ExecutionAttemptID, Integer> subtaskIndexMap = Maps
+                                       
.newHashMapWithExpectedSize(traces.size());
+
+                       Set<ExecutionAttemptID> sampledTasks = 
sample.getStackTraces().keySet();
+
+                       for (ExecutionVertex task : vertex.getTaskVertices()) {
+                               ExecutionAttemptID taskId = 
task.getCurrentExecutionAttempt().getAttemptId();
+                               if (sampledTasks.contains(taskId)) {
+                                       subtaskIndexMap.put(taskId, 
task.getParallelSubtaskIndex());
+                               } else {
+                                       LOG.debug("Outdated sample. A task, 
which is part of the " +
+                                                       "sample has been 
reset.");
+                               }
+                       }
+
+                       // Ratio of blocked samples to total samples per sub 
task. Array
+                       // position corresponds to sub task index.
+                       double[] backPressureRatio = new double[traces.size()];
+
+                       for (Entry<ExecutionAttemptID, 
List<StackTraceElement[]>> entry : traces.entrySet()) {
+                               int backPressureSamples = 0;
+
+                               List<StackTraceElement[]> taskTraces = 
entry.getValue();
+
+                               for (StackTraceElement[] trace : taskTraces) {
+                                       for (int i = trace.length - 1; i >= 0; 
i--) {
+                                               StackTraceElement elem = 
trace[i];
+
+                                               if 
(elem.getClassName().equals(EXPECTED_CLASS_NAME) &&
+                                                               
elem.getMethodName().equals(EXPECTED_METHOD_NAME)) {
+
+                                                       backPressureSamples++;
+                                                       break; // Continue with 
next stack trace
+                                               }
+                                       }
+                               }
+
+                               int subtaskIndex = 
subtaskIndexMap.get(entry.getKey());
+
+                               int size = taskTraces.size();
+                               double ratio = (size > 0)
+                                               ? ((double) 
backPressureSamples) / size
+                                               : 0;
+
+                               backPressureRatio[subtaskIndex] = ratio;
+                       }
+
+                       return new OperatorBackPressureStats(
+                                       sample.getSampleId(),
+                                       sample.getEndTime(),
+                                       backPressureRatio);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
new file mode 100644
index 0000000..1a78a17
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/OperatorBackPressureStats.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import java.util.Arrays;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Back pressure statistics of multiple tasks.
+ *
+ * <p>Statistics are gathered by sampling stack traces of running tasks. The
+ * back pressure ratio denotes the ratio of traces indicating back pressure
+ * to the total number of sampled traces.
+ */
+public class OperatorBackPressureStats {
+
+       /** ID of the corresponding sample. */
+       private final int sampleId;
+
+       /** End time stamp of the corresponding sample. */
+       private final long endTimestamp;
+
+       /** Back pressure ratio per subtask. */
+       private final double[] subTaskBackPressureRatio;
+
+       /** Maximum back pressure ratio. */
+       private final double maxSubTaskBackPressureRatio;
+
+       public OperatorBackPressureStats(
+                       int sampleId,
+                       long endTimestamp,
+                       double[] subTaskBackPressureRatio) {
+
+               this.sampleId = sampleId;
+               this.endTimestamp = endTimestamp;
+               this.subTaskBackPressureRatio = 
checkNotNull(subTaskBackPressureRatio, "Sub task back pressure ratio");
+               checkArgument(subTaskBackPressureRatio.length >= 1, "No Sub 
task back pressure ratio specified");
+
+               double max = 0;
+               for (double ratio : subTaskBackPressureRatio) {
+                       if (ratio > max) {
+                               max = ratio;
+                       }
+               }
+
+               maxSubTaskBackPressureRatio = max;
+       }
+
+       /**
+        * Returns the ID of the sample.
+        *
+        * @return ID of the sample
+        */
+       public int getSampleId() {
+               return sampleId;
+       }
+
+       /**
+        * Returns the time stamp, when all stack traces were collected at the
+        * JobManager.
+        *
+        * @return Time stamp, when all stack traces were collected at the
+        * JobManager
+        */
+       public long getEndTimestamp() {
+               return endTimestamp;
+       }
+
+       /**
+        * Returns the number of sub tasks.
+        *
+        * @return Number of sub tasks.
+        */
+       public int getNumberOfSubTasks() {
+               return subTaskBackPressureRatio.length;
+       }
+
+       /**
+        * Returns the ratio of stack traces indicating back pressure to total
+        * number of sampled stack traces.
+        *
+        * @param index Subtask index.
+        *
+        * @return Ratio of stack traces indicating back pressure to total 
number
+        * of sampled stack traces.
+        */
+       public double getBackPressureRatio(int index) {
+               return subTaskBackPressureRatio[index];
+       }
+
+       /**
+        * Returns the maximum back pressure ratio of all sub tasks.
+        *
+        * @return Maximum back pressure ratio of all sub tasks.
+        */
+       public double getMaxBackPressureRatio() {
+               return maxSubTaskBackPressureRatio;
+       }
+
+       @Override
+       public String toString() {
+               return "OperatorBackPressureStats{" +
+                               "sampleId=" + sampleId +
+                               ", endTimestamp=" + endTimestamp +
+                               ", subTaskBackPressureRatio=" + 
Arrays.toString(subTaskBackPressureRatio) +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
new file mode 100644
index 0000000..dda4e33
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSample.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A sample of stack traces for one or more tasks.
+ *
+ * <p>The sampling is triggered in {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSample {
+
+       /** ID of this sample (unique per job). */
+       private final int sampleId;
+
+       /** Time stamp, when the sample was triggered. */
+       private final long startTime;
+
+       /** Time stamp, when all stack traces were collected at the JobManager. 
*/
+       private final long endTime;
+
+       /** Map of stack traces by execution ID. */
+       private final Map<ExecutionAttemptID, List<StackTraceElement[]>> 
stackTracesByTask;
+
+       /**
+        * Creates a stack trace sample.
+        *
+        * @param sampleId          ID of the sample.
+        * @param startTime         Time stamp, when the sample was triggered.
+        * @param endTime           Time stamp, when all stack traces were
+        *                          collected at the JobManager.
+        * @param stackTracesByTask Map of stack traces by execution ID.
+        */
+       public StackTraceSample(
+                       int sampleId,
+                       long startTime,
+                       long endTime,
+                       Map<ExecutionAttemptID, List<StackTraceElement[]>> 
stackTracesByTask) {
+
+               checkArgument(sampleId >= 0, "Negative sample ID");
+               checkArgument(startTime >= 0, "Negative start time");
+               checkArgument(endTime >= startTime, "End time before start 
time");
+
+               this.sampleId = sampleId;
+               this.startTime = startTime;
+               this.endTime = endTime;
+               this.stackTracesByTask = 
Collections.unmodifiableMap(stackTracesByTask);
+       }
+
+       /**
+        * Returns the ID of the sample.
+        *
+        * @return ID of the sample
+        */
+       public int getSampleId() {
+               return sampleId;
+       }
+
+       /**
+        * Returns the time stamp, when the sample was triggered.
+        *
+        * @return Time stamp, when the sample was triggered
+        */
+       public long getStartTime() {
+               return startTime;
+       }
+
+       /**
+        * Returns the time stamp, when all stack traces were collected at the
+        * JobManager.
+        *
+        * @return Time stamp, when all stack traces were collected at the
+        * JobManager
+        */
+       public long getEndTime() {
+               return endTime;
+       }
+
+       /**
+        * Returns the a map of stack traces by execution ID.
+        *
+        * @return Map of stack traces by execution ID
+        */
+       public Map<ExecutionAttemptID, List<StackTraceElement[]>> 
getStackTraces() {
+               return stackTracesByTask;
+       }
+
+       @Override
+       public String toString() {
+               return "StackTraceSample{" +
+                               "sampleId=" + sampleId +
+                               ", startTime=" + startTime +
+                               ", endTime=" + endTime +
+                               '}';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
new file mode 100644
index 0000000..8c2ec6e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinator.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Maps;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A coordinator for triggering and collecting stack traces of running tasks.
+ */
+public class StackTraceSampleCoordinator {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(StackTraceSampleCoordinator.class);
+
+       private static final int NUM_GHOST_SAMPLE_IDS = 10;
+
+       private final Object lock = new Object();
+
+       /** Executor used to run the futures. */
+       private final Executor executor;
+
+       /** Time out after the expected sampling duration. */
+       private final long sampleTimeout;
+
+       /** In progress samples (guarded by lock). */
+       private final Map<Integer, PendingStackTraceSample> pendingSamples = 
new HashMap<>();
+
+       /** A list of recent sample IDs to identify late messages vs. invalid 
ones. */
+       private final ArrayDeque<Integer> recentPendingSamples = new 
ArrayDeque<>(NUM_GHOST_SAMPLE_IDS);
+
+       /** Sample ID counter (guarded by lock). */
+       private int sampleIdCounter;
+
+       /**
+        * Flag indicating whether the coordinator is still running (guarded by
+        * lock).
+        */
+       private boolean isShutDown;
+
+       /**
+        * Creates a new coordinator for the job.
+        *
+        * @param executor to use to execute the futures
+        * @param sampleTimeout Time out after the expected sampling duration.
+        *                      This is added to the expected duration of a
+        *                      sample, which is determined by the number of
+        *                      samples and the delay between each sample.
+        */
+       public StackTraceSampleCoordinator(Executor executor, long 
sampleTimeout) {
+               checkArgument(sampleTimeout >= 0L);
+               this.executor = Preconditions.checkNotNull(executor);
+               this.sampleTimeout = sampleTimeout;
+       }
+
+       /**
+        * Triggers a stack trace sample to all tasks.
+        *
+        * @param tasksToSample       Tasks to sample.
+        * @param numSamples          Number of stack trace samples to collect.
+        * @param delayBetweenSamples Delay between consecutive samples.
+        * @param maxStackTraceDepth  Maximum depth of the stack trace. 0 
indicates
+        *                            no maximum and keeps the complete stack 
trace.
+        * @return A future of the completed stack trace sample
+        */
+       @SuppressWarnings("unchecked")
+       public CompletableFuture<StackTraceSample> triggerStackTraceSample(
+                       ExecutionVertex[] tasksToSample,
+                       int numSamples,
+                       Time delayBetweenSamples,
+                       int maxStackTraceDepth) {
+
+               checkNotNull(tasksToSample, "Tasks to sample");
+               checkArgument(tasksToSample.length >= 1, "No tasks to sample");
+               checkArgument(numSamples >= 1, "No number of samples");
+               checkArgument(maxStackTraceDepth >= 0, "Negative maximum stack 
trace depth");
+
+               // Execution IDs of running tasks
+               ExecutionAttemptID[] triggerIds = new 
ExecutionAttemptID[tasksToSample.length];
+               Execution[] executions = new Execution[tasksToSample.length];
+
+               // Check that all tasks are RUNNING before triggering anything. 
The
+               // triggering can still fail.
+               for (int i = 0; i < triggerIds.length; i++) {
+                       Execution execution = 
tasksToSample[i].getCurrentExecutionAttempt();
+                       if (execution != null && execution.getState() == 
ExecutionState.RUNNING) {
+                               executions[i] = execution;
+                               triggerIds[i] = execution.getAttemptId();
+                       } else {
+                               return FutureUtils.completedExceptionally(new 
IllegalStateException("Task " + tasksToSample[i]
+                                       .getTaskNameWithSubtaskIndex() + " is 
not running."));
+                       }
+               }
+
+               synchronized (lock) {
+                       if (isShutDown) {
+                               return FutureUtils.completedExceptionally(new 
IllegalStateException("Shut down"));
+                       }
+
+                       final int sampleId = sampleIdCounter++;
+
+                       LOG.debug("Triggering stack trace sample {}", sampleId);
+
+                       final PendingStackTraceSample pending = new 
PendingStackTraceSample(
+                                       sampleId, triggerIds);
+
+                       // Discard the sample if it takes too long. We don't 
send cancel
+                       // messages to the task managers, but only wait for the 
responses
+                       // and then ignore them.
+                       long expectedDuration = numSamples * 
delayBetweenSamples.toMilliseconds();
+                       Time timeout = Time.milliseconds(expectedDuration + 
sampleTimeout);
+
+                       // Add the pending sample before scheduling the discard 
task to
+                       // prevent races with removing it again.
+                       pendingSamples.put(sampleId, pending);
+
+                       // Trigger all samples
+                       for (Execution execution: executions) {
+                               final 
CompletableFuture<StackTraceSampleResponse> stackTraceSampleFuture = 
execution.requestStackTraceSample(
+                                       sampleId,
+                                       numSamples,
+                                       delayBetweenSamples,
+                                       maxStackTraceDepth,
+                                       timeout);
+
+                               stackTraceSampleFuture.handleAsync(
+                                       (StackTraceSampleResponse 
stackTraceSampleResponse, Throwable throwable) -> {
+                                               if (stackTraceSampleResponse != 
null) {
+                                                       collectStackTraces(
+                                                               
stackTraceSampleResponse.getSampleId(),
+                                                               
stackTraceSampleResponse.getExecutionAttemptID(),
+                                                               
stackTraceSampleResponse.getSamples());
+                                               } else {
+                                                       
cancelStackTraceSample(sampleId, throwable);
+                                               }
+
+                                               return null;
+                                       },
+                                       executor);
+                       }
+
+                       return pending.getStackTraceSampleFuture();
+               }
+       }
+
+       /**
+        * Cancels a pending sample.
+        *
+        * @param sampleId ID of the sample to cancel.
+        * @param cause Cause of the cancelling (can be <code>null</code>).
+        */
+       public void cancelStackTraceSample(int sampleId, Throwable cause) {
+               synchronized (lock) {
+                       if (isShutDown) {
+                               return;
+                       }
+
+                       PendingStackTraceSample sample = 
pendingSamples.remove(sampleId);
+                       if (sample != null) {
+                               if (cause != null) {
+                                       LOG.info("Cancelling sample " + 
sampleId, cause);
+                               } else {
+                                       LOG.info("Cancelling sample {}", 
sampleId);
+                               }
+
+                               sample.discard(cause);
+                               rememberRecentSampleId(sampleId);
+                       }
+               }
+       }
+
+       /**
+        * Shuts down the coordinator.
+        *
+        * <p>After shut down, no further operations are executed.
+        */
+       public void shutDown() {
+               synchronized (lock) {
+                       if (!isShutDown) {
+                               LOG.info("Shutting down stack trace sample 
coordinator.");
+
+                               for (PendingStackTraceSample pending : 
pendingSamples.values()) {
+                                       pending.discard(new 
RuntimeException("Shut down"));
+                               }
+
+                               pendingSamples.clear();
+
+                               isShutDown = true;
+                       }
+               }
+       }
+
+       /**
+        * Collects stack traces of a task.
+        *
+        * @param sampleId    ID of the sample.
+        * @param executionId ID of the sampled task.
+        * @param stackTraces Stack traces of the sampled task.
+        *
+        * @throws IllegalStateException If unknown sample ID and not recently
+        *                               finished or cancelled sample.
+        */
+       public void collectStackTraces(
+                       int sampleId,
+                       ExecutionAttemptID executionId,
+                       List<StackTraceElement[]> stackTraces) {
+
+               synchronized (lock) {
+                       if (isShutDown) {
+                               return;
+                       }
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Collecting stack trace sample {} of 
task {}", sampleId, executionId);
+                       }
+
+                       PendingStackTraceSample pending = 
pendingSamples.get(sampleId);
+
+                       if (pending != null) {
+                               pending.collectStackTraces(executionId, 
stackTraces);
+
+                               // Publish the sample
+                               if (pending.isComplete()) {
+                                       pendingSamples.remove(sampleId);
+                                       rememberRecentSampleId(sampleId);
+
+                                       pending.completePromiseAndDiscard();
+                               }
+                       } else if (recentPendingSamples.contains(sampleId)) {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Received late stack trace 
sample {} of task {}",
+                                                       sampleId, executionId);
+                               }
+                       } else {
+                               if (LOG.isDebugEnabled()) {
+                                       LOG.debug("Unknown sample ID " + 
sampleId);
+                               }
+                       }
+               }
+       }
+
+       private void rememberRecentSampleId(int sampleId) {
+               if (recentPendingSamples.size() >= NUM_GHOST_SAMPLE_IDS) {
+                       recentPendingSamples.removeFirst();
+               }
+               recentPendingSamples.addLast(sampleId);
+       }
+
+       int getNumberOfPendingSamples() {
+               synchronized (lock) {
+                       return pendingSamples.size();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A pending stack trace sample, which collects stack traces and owns a
+        * {@link StackTraceSample} promise.
+        *
+        * <p>Access pending sample in lock scope.
+        */
+       private static class PendingStackTraceSample {
+
+               private final int sampleId;
+               private final long startTime;
+               private final Set<ExecutionAttemptID> pendingTasks;
+               private final Map<ExecutionAttemptID, 
List<StackTraceElement[]>> stackTracesByTask;
+               private final CompletableFuture<StackTraceSample> 
stackTraceFuture;
+
+               private boolean isDiscarded;
+
+               PendingStackTraceSample(
+                               int sampleId,
+                               ExecutionAttemptID[] tasksToCollect) {
+
+                       this.sampleId = sampleId;
+                       this.startTime = System.currentTimeMillis();
+                       this.pendingTasks = new 
HashSet<>(Arrays.asList(tasksToCollect));
+                       this.stackTracesByTask = 
Maps.newHashMapWithExpectedSize(tasksToCollect.length);
+                       this.stackTraceFuture = new CompletableFuture<>();
+               }
+
+               int getSampleId() {
+                       return sampleId;
+               }
+
+               long getStartTime() {
+                       return startTime;
+               }
+
+               boolean isDiscarded() {
+                       return isDiscarded;
+               }
+
+               boolean isComplete() {
+                       if (isDiscarded) {
+                               throw new IllegalStateException("Discarded");
+                       }
+
+                       return pendingTasks.isEmpty();
+               }
+
+               void discard(Throwable cause) {
+                       if (!isDiscarded) {
+                               pendingTasks.clear();
+                               stackTracesByTask.clear();
+
+                               stackTraceFuture.completeExceptionally(new 
RuntimeException("Discarded", cause));
+
+                               isDiscarded = true;
+                       }
+               }
+
+               void collectStackTraces(ExecutionAttemptID executionId, 
List<StackTraceElement[]> stackTraces) {
+                       if (isDiscarded) {
+                               throw new IllegalStateException("Discarded");
+                       }
+
+                       if (pendingTasks.remove(executionId)) {
+                               stackTracesByTask.put(executionId, 
Collections.unmodifiableList(stackTraces));
+                       } else if (isComplete()) {
+                               throw new IllegalStateException("Completed");
+                       } else {
+                               throw new IllegalArgumentException("Unknown 
task " + executionId);
+                       }
+               }
+
+               void completePromiseAndDiscard() {
+                       if (isComplete()) {
+                               isDiscarded = true;
+
+                               long endTime = System.currentTimeMillis();
+
+                               StackTraceSample stackTraceSample = new 
StackTraceSample(
+                                               sampleId,
+                                               startTime,
+                                               endTime,
+                                               stackTracesByTask);
+
+                               stackTraceFuture.complete(stackTraceSample);
+                       } else {
+                               throw new IllegalStateException("Not completed 
yet");
+                       }
+               }
+
+               @SuppressWarnings("unchecked")
+               CompletableFuture<StackTraceSample> getStackTraceSampleFuture() 
{
+                       return stackTraceFuture;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
new file mode 100644
index 0000000..2086628
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler that returns a job's snapshotting settings.
+ */
+public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       private static final String CHECKPOINT_CONFIG_REST_PATH = 
"/jobs/:jobid/checkpoints/config";
+
+       public CheckpointConfigHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{CHECKPOINT_CONFIG_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return 
createCheckpointConfigJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create checkpoint config json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the CheckpointConfigHandler.
+        */
+       public static class CheckpointConfigJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createCheckpointConfigJson(graph);
+                       String path = CHECKPOINT_CONFIG_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
+       private static String createCheckpointConfigJson(AccessExecutionGraph 
graph) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+               JobCheckpointingSettings settings = 
graph.getJobCheckpointingSettings();
+
+               if (settings == null) {
+                       return "{}";
+               }
+
+               gen.writeStartObject();
+               {
+                       gen.writeStringField("mode", settings.isExactlyOnce() ? 
"exactly_once" : "at_least_once");
+                       gen.writeNumberField("interval", 
settings.getCheckpointInterval());
+                       gen.writeNumberField("timeout", 
settings.getCheckpointTimeout());
+                       gen.writeNumberField("min_pause", 
settings.getMinPauseBetweenCheckpoints());
+                       gen.writeNumberField("max_concurrent", 
settings.getMaxConcurrentCheckpoints());
+
+                       ExternalizedCheckpointSettings externalization = 
settings.getExternalizedCheckpointSettings();
+                       gen.writeObjectFieldStart("externalization");
+                       {
+                               if (externalization.externalizeCheckpoints()) {
+                                       gen.writeBooleanField("enabled", true);
+                                       
gen.writeBooleanField("delete_on_cancellation", 
externalization.deleteOnCancellation());
+                               } else {
+                                       gen.writeBooleanField("enabled", false);
+                               }
+                       }
+                       gen.writeEndObject();
+
+               }
+               gen.writeEndObject();
+
+               gen.close();
+
+               return writer.toString();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
new file mode 100644
index 0000000..f21fc76
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCache.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+
+import org.apache.flink.shaded.guava18.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava18.com.google.common.cache.CacheBuilder;
+
+import javax.annotation.Nullable;
+
+/**
+ * A size-based cache of accessed checkpoints for completed and failed
+ * checkpoints.
+ *
+ * <p>Having this cache in place for accessed stats improves the user
+ * experience quite a bit as accessed checkpoint stats stay available
+ * and don't expire. For example if you manage to click on the last
+ * checkpoint in the history, it is not available via the stats as soon
+ * as another checkpoint is triggered. With the cache in place, the
+ * checkpoint will still be available for investigation.
+ */
+public class CheckpointStatsCache {
+
+       @Nullable
+       private final Cache<Long, AbstractCheckpointStats> cache;
+
+       public CheckpointStatsCache(int maxNumEntries) {
+               if (maxNumEntries > 0) {
+                       this.cache = CacheBuilder.<Long, 
AbstractCheckpointStats>newBuilder()
+                               .maximumSize(maxNumEntries)
+                               .build();
+               } else {
+                       this.cache = null;
+               }
+       }
+
+       /**
+        * Try to add the checkpoint to the cache.
+        *
+        * @param checkpoint Checkpoint to be added.
+        */
+       void tryAdd(AbstractCheckpointStats checkpoint) {
+               // Don't add in progress checkpoints as they will be replaced 
by their
+               // completed/failed version eventually.
+               if (cache != null && checkpoint != null && 
!checkpoint.getStatus().isInProgress()) {
+                       cache.put(checkpoint.getCheckpointId(), checkpoint);
+               }
+       }
+
+       /**
+        * Try to look up a checkpoint by it's ID in the cache.
+        *
+        * @param checkpointId ID of the checkpoint to look up.
+        * @return The checkpoint or <code>null</code> if checkpoint not found.
+        */
+       AbstractCheckpointStats tryGet(long checkpointId) {
+               if (cache != null) {
+                       return cache.getIfPresent(checkpointId);
+               } else {
+                       return null;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
new file mode 100644
index 0000000..61ebeda
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex.
+ */
+public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       private static final String CHECKPOINT_STATS_DETAILS_REST_PATH = 
"/jobs/:jobid/checkpoints/details/:checkpointid";
+
+       private final CheckpointStatsCache cache;
+
+       public CheckpointStatsDetailsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+               super(executionGraphHolder, executor);
+               this.cache = cache;
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{CHECKPOINT_STATS_DETAILS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               long checkpointId = parseCheckpointId(params);
+                               if (checkpointId == -1) {
+                                       return "{}";
+                               }
+
+                               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+                               if (snapshot == null) {
+                                       return "{}";
+                               }
+
+                               AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
+
+                               if (checkpoint != null) {
+                                       cache.tryAdd(checkpoint);
+                               } else {
+                                       checkpoint = cache.tryGet(checkpointId);
+
+                                       if (checkpoint == null) {
+                                               return "{}";
+                                       }
+                               }
+
+                               try {
+                                       return 
createCheckpointDetailsJson(checkpoint);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create checkpoint details json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the CheckpointStatsDetails.
+        */
+       public static class CheckpointStatsDetailsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
+                       if (stats == null) {
+                               return Collections.emptyList();
+                       }
+                       CheckpointStatsHistory history = stats.getHistory();
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                               String json = 
createCheckpointDetailsJson(checkpoint);
+                               String path = CHECKPOINT_STATS_DETAILS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":checkpointid", 
String.valueOf(checkpoint.getCheckpointId()));
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
+       public static String 
createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws 
IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+               gen.writeStartObject();
+
+               gen.writeNumberField("id", checkpoint.getCheckpointId());
+               gen.writeStringField("status", 
checkpoint.getStatus().toString());
+               gen.writeBooleanField("is_savepoint", 
checkpoint.getProperties().isSavepoint());
+               gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
+               gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
+               gen.writeNumberField("state_size", checkpoint.getStateSize());
+               gen.writeNumberField("end_to_end_duration", 
checkpoint.getEndToEndDuration());
+               gen.writeNumberField("alignment_buffered", 
checkpoint.getAlignmentBuffered());
+               gen.writeNumberField("num_subtasks", 
checkpoint.getNumberOfSubtasks());
+               gen.writeNumberField("num_acknowledged_subtasks", 
checkpoint.getNumberOfAcknowledgedSubtasks());
+
+               if (checkpoint.getStatus().isCompleted()) {
+                       // --- Completed ---
+                       CompletedCheckpointStats completed = 
(CompletedCheckpointStats) checkpoint;
+
+                       String externalPath = completed.getExternalPath();
+                       if (externalPath != null) {
+                               gen.writeStringField("external_path", 
externalPath);
+                       }
+
+                       gen.writeBooleanField("discarded", 
completed.isDiscarded());
+               }
+               else if (checkpoint.getStatus().isFailed()) {
+                       // --- Failed ---
+                       FailedCheckpointStats failed = (FailedCheckpointStats) 
checkpoint;
+
+                       gen.writeNumberField("failure_timestamp", 
failed.getFailureTimestamp());
+
+                       String failureMsg = failed.getFailureMessage();
+                       if (failureMsg != null) {
+                               gen.writeStringField("failure_message", 
failureMsg);
+                       }
+               }
+
+               gen.writeObjectFieldStart("tasks");
+               for (TaskStateStats taskStats : 
checkpoint.getAllTaskStateStats()) {
+                       
gen.writeObjectFieldStart(taskStats.getJobVertexId().toString());
+
+                       gen.writeNumberField("latest_ack_timestamp", 
taskStats.getLatestAckTimestamp());
+                       gen.writeNumberField("state_size", 
taskStats.getStateSize());
+                       gen.writeNumberField("end_to_end_duration", 
taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+                       gen.writeNumberField("alignment_buffered", 
taskStats.getAlignmentBuffered());
+                       gen.writeNumberField("num_subtasks", 
taskStats.getNumberOfSubtasks());
+                       gen.writeNumberField("num_acknowledged_subtasks", 
taskStats.getNumberOfAcknowledgedSubtasks());
+
+                       gen.writeEndObject();
+               }
+               gen.writeEndObject();
+
+               gen.writeEndObject();
+               gen.close();
+
+               return writer.toString();
+       }
+
+       /**
+        * Returns the checkpoint ID parsed from the provided parameters.
+        *
+        * @param params Path parameters
+        * @return Parsed checkpoint ID or <code>-1</code> if not available.
+        */
+       static long parseCheckpointId(Map<String, String> params) {
+               String param = params.get("checkpointid");
+               if (param == null) {
+                       return -1;
+               }
+
+               try {
+                       return Long.parseLong(param);
+               } catch (NumberFormatException ignored) {
+                       return -1;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
new file mode 100644
index 0000000..22a8db2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import 
org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Request handler that returns checkpoint stats for a single job vertex with
+ * the summary stats and all subtasks.
+ */
+public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       private static final String CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH 
= "/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid";
+
+       private final CheckpointStatsCache cache;
+
+       public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor, CheckpointStatsCache cache) {
+               super(executionGraphHolder, executor);
+               this.cache = checkNotNull(cache);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new 
String[]{CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleJsonRequest(
+                       Map<String, String> pathParams,
+                       Map<String, String> queryParams,
+                       JobManagerGateway jobManagerGateway) {
+               return super.handleJsonRequest(pathParams, queryParams, 
jobManagerGateway);
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               long checkpointId = 
CheckpointStatsDetailsHandler.parseCheckpointId(params);
+               if (checkpointId == -1) {
+                       return CompletableFuture.completedFuture("{}");
+               }
+
+               JobVertexID vertexId = 
AbstractJobVertexRequestHandler.parseJobVertexId(params);
+               if (vertexId == null) {
+                       return CompletableFuture.completedFuture("{}");
+               }
+
+               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+               if (snapshot == null) {
+                       return CompletableFuture.completedFuture("{}");
+               }
+
+               AbstractCheckpointStats checkpoint = 
snapshot.getHistory().getCheckpointById(checkpointId);
+
+               if (checkpoint != null) {
+                       cache.tryAdd(checkpoint);
+               } else {
+                       checkpoint = cache.tryGet(checkpointId);
+
+                       if (checkpoint == null) {
+                               return CompletableFuture.completedFuture("{}");
+                       }
+               }
+
+               TaskStateStats taskStats = 
checkpoint.getTaskStateStats(vertexId);
+               if (taskStats == null) {
+                       return CompletableFuture.completedFuture("{}");
+               }
+
+               try {
+                       return 
CompletableFuture.completedFuture(createSubtaskCheckpointDetailsJson(checkpoint,
 taskStats));
+               } catch (IOException e) {
+                       return FutureUtils.completedExceptionally(e);
+               }
+       }
+
+       /**
+        * Archivist for the CheckpointStatsDetailsSubtasksHandler.
+        */
+       public static class CheckpointStatsDetailsSubtasksJsonArchivist 
implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
+                       if (stats == null) {
+                               return Collections.emptyList();
+                       }
+                       CheckpointStatsHistory history = stats.getHistory();
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                               for (TaskStateStats subtaskStats : 
checkpoint.getAllTaskStateStats()) {
+                                       String json = 
createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
+                                       String path = 
CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":checkpointid", 
String.valueOf(checkpoint.getCheckpointId()))
+                                               .replace(":vertexid", 
subtaskStats.getJobVertexId().toString());
+                                       archive.add(new ArchivedJson(path, 
json));
+                               }
+                       }
+                       return archive;
+               }
+       }
+
+       private static String 
createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, 
TaskStateStats taskStats) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               gen.writeStartObject();
+               // Overview
+               gen.writeNumberField("id", checkpoint.getCheckpointId());
+               gen.writeStringField("status", 
checkpoint.getStatus().toString());
+               gen.writeNumberField("latest_ack_timestamp", 
taskStats.getLatestAckTimestamp());
+               gen.writeNumberField("state_size", taskStats.getStateSize());
+               gen.writeNumberField("end_to_end_duration", 
taskStats.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+               gen.writeNumberField("alignment_buffered", 
taskStats.getAlignmentBuffered());
+               gen.writeNumberField("num_subtasks", 
taskStats.getNumberOfSubtasks());
+               gen.writeNumberField("num_acknowledged_subtasks", 
taskStats.getNumberOfAcknowledgedSubtasks());
+
+               if (taskStats.getNumberOfAcknowledgedSubtasks() > 0) {
+                       gen.writeObjectFieldStart("summary");
+                       gen.writeObjectFieldStart("state_size");
+                       CheckpointStatsHandler.writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getStateSizeStats());
+                       gen.writeEndObject();
+
+                       gen.writeObjectFieldStart("end_to_end_duration");
+                       MinMaxAvgStats ackTimestampStats = 
taskStats.getSummaryStats().getAckTimestampStats();
+                       gen.writeNumberField("min", Math.max(0, 
ackTimestampStats.getMinimum() - checkpoint.getTriggerTimestamp()));
+                       gen.writeNumberField("max", Math.max(0, 
ackTimestampStats.getMaximum() - checkpoint.getTriggerTimestamp()));
+                       gen.writeNumberField("avg", Math.max(0, 
ackTimestampStats.getAverage() - checkpoint.getTriggerTimestamp()));
+                       gen.writeEndObject();
+
+                       gen.writeObjectFieldStart("checkpoint_duration");
+                       gen.writeObjectFieldStart("sync");
+                       CheckpointStatsHandler.writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getSyncCheckpointDurationStats());
+                       gen.writeEndObject();
+                       gen.writeObjectFieldStart("async");
+                       CheckpointStatsHandler.writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getAsyncCheckpointDurationStats());
+                       gen.writeEndObject();
+                       gen.writeEndObject();
+
+                       gen.writeObjectFieldStart("alignment");
+                       gen.writeObjectFieldStart("buffered");
+                       CheckpointStatsHandler.writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getAlignmentBufferedStats());
+                       gen.writeEndObject();
+                       gen.writeObjectFieldStart("duration");
+                       CheckpointStatsHandler.writeMinMaxAvg(gen, 
taskStats.getSummaryStats().getAlignmentDurationStats());
+                       gen.writeEndObject();
+                       gen.writeEndObject();
+                       gen.writeEndObject();
+               }
+
+               SubtaskStateStats[] subtasks = taskStats.getSubtaskStats();
+
+               gen.writeArrayFieldStart("subtasks");
+               for (int i = 0; i < subtasks.length; i++) {
+                       SubtaskStateStats subtask = subtasks[i];
+
+                       gen.writeStartObject();
+                       gen.writeNumberField("index", i);
+
+                       if (subtask != null) {
+                               gen.writeStringField("status", "completed");
+                               gen.writeNumberField("ack_timestamp", 
subtask.getAckTimestamp());
+                               gen.writeNumberField("end_to_end_duration", 
subtask.getEndToEndDuration(checkpoint.getTriggerTimestamp()));
+                               gen.writeNumberField("state_size", 
subtask.getStateSize());
+
+                               gen.writeObjectFieldStart("checkpoint");
+                               gen.writeNumberField("sync", 
subtask.getSyncCheckpointDuration());
+                               gen.writeNumberField("async", 
subtask.getAsyncCheckpointDuration());
+                               gen.writeEndObject();
+
+                               gen.writeObjectFieldStart("alignment");
+                               gen.writeNumberField("buffered", 
subtask.getAlignmentBuffered());
+                               gen.writeNumberField("duration", 
subtask.getAlignmentDuration());
+                               gen.writeEndObject();
+                       } else {
+                               gen.writeStringField("status", 
"pending_or_failed");
+                       }
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+
+               gen.writeEndObject();
+               gen.close();
+
+               return writer.toString();
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
new file mode 100644
index 0000000..abb353e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
+import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
+import org.apache.flink.runtime.concurrent.FlinkFutureException;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import 
org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.rest.handler.legacy.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler that returns checkpoint statistics for a job.
+ */
+public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler {
+
+       private static final String CHECKPOINT_STATS_REST_PATH = 
"/jobs/:jobid/checkpoints";
+
+       public CheckpointStatsHandler(ExecutionGraphHolder 
executionGraphHolder, Executor executor) {
+               super(executionGraphHolder, executor);
+       }
+
+       @Override
+       public String[] getPaths() {
+               return new String[]{CHECKPOINT_STATS_REST_PATH};
+       }
+
+       @Override
+       public CompletableFuture<String> handleRequest(AccessExecutionGraph 
graph, Map<String, String> params) {
+               return CompletableFuture.supplyAsync(
+                       () -> {
+                               try {
+                                       return createCheckpointStatsJson(graph);
+                               } catch (IOException e) {
+                                       throw new FlinkFutureException("Could 
not create checkpoint stats json.", e);
+                               }
+                       },
+                       executor);
+       }
+
+       /**
+        * Archivist for the CheckpointStatsJsonHandler.
+        */
+       public static class CheckpointStatsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createCheckpointStatsJson(graph);
+                       String path = CHECKPOINT_STATS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
+       private static String createCheckpointStatsJson(AccessExecutionGraph 
graph) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.JACKSON_FACTORY.createGenerator(writer);
+
+               CheckpointStatsSnapshot snapshot = 
graph.getCheckpointStatsSnapshot();
+               if (snapshot == null) {
+                       return "{}";
+               }
+
+               gen.writeStartObject();
+
+               // Counts
+               writeCounts(gen, snapshot.getCounts());
+
+               // Summary
+               writeSummary(gen, snapshot.getSummaryStats());
+
+               CheckpointStatsHistory history = snapshot.getHistory();
+
+               // Latest
+               writeLatestCheckpoints(
+                       gen,
+                       history.getLatestCompletedCheckpoint(),
+                       history.getLatestSavepoint(),
+                       history.getLatestFailedCheckpoint(),
+                       snapshot.getLatestRestoredCheckpoint());
+
+               // History
+               writeHistory(gen, snapshot.getHistory());
+
+               gen.writeEndObject();
+               gen.close();
+
+               return writer.toString();
+       }
+
+       private static void writeCounts(JsonGenerator gen, 
CheckpointStatsCounts counts) throws IOException {
+               gen.writeObjectFieldStart("counts");
+               gen.writeNumberField("restored", 
counts.getNumberOfRestoredCheckpoints());
+               gen.writeNumberField("total", 
counts.getTotalNumberOfCheckpoints());
+               gen.writeNumberField("in_progress", 
counts.getNumberOfInProgressCheckpoints());
+               gen.writeNumberField("completed", 
counts.getNumberOfCompletedCheckpoints());
+               gen.writeNumberField("failed", 
counts.getNumberOfFailedCheckpoints());
+               gen.writeEndObject();
+       }
+
+       private static void writeSummary(
+               JsonGenerator gen,
+               CompletedCheckpointStatsSummary summary) throws IOException {
+               gen.writeObjectFieldStart("summary");
+               gen.writeObjectFieldStart("state_size");
+               writeMinMaxAvg(gen, summary.getStateSizeStats());
+               gen.writeEndObject();
+
+               gen.writeObjectFieldStart("end_to_end_duration");
+               writeMinMaxAvg(gen, summary.getEndToEndDurationStats());
+               gen.writeEndObject();
+
+               gen.writeObjectFieldStart("alignment_buffered");
+               writeMinMaxAvg(gen, summary.getAlignmentBufferedStats());
+               gen.writeEndObject();
+               gen.writeEndObject();
+       }
+
+       static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) 
throws IOException {
+               gen.writeNumberField("min", minMaxAvg.getMinimum());
+               gen.writeNumberField("max", minMaxAvg.getMaximum());
+               gen.writeNumberField("avg", minMaxAvg.getAverage());
+       }
+
+       private static void writeLatestCheckpoints(
+               JsonGenerator gen,
+               @Nullable CompletedCheckpointStats completed,
+               @Nullable CompletedCheckpointStats savepoint,
+               @Nullable FailedCheckpointStats failed,
+               @Nullable RestoredCheckpointStats restored) throws IOException {
+
+               gen.writeObjectFieldStart("latest");
+               // Completed checkpoint
+               if (completed != null) {
+                       gen.writeObjectFieldStart("completed");
+                       writeCheckpoint(gen, completed);
+
+                       String externalPath = completed.getExternalPath();
+                       if (externalPath != null) {
+                               gen.writeStringField("external_path", 
completed.getExternalPath());
+                       }
+
+                       gen.writeEndObject();
+               }
+
+               // Completed savepoint
+               if (savepoint != null) {
+                       gen.writeObjectFieldStart("savepoint");
+                       writeCheckpoint(gen, savepoint);
+
+                       String externalPath = savepoint.getExternalPath();
+                       if (externalPath != null) {
+                               gen.writeStringField("external_path", 
savepoint.getExternalPath());
+                       }
+                       gen.writeEndObject();
+               }
+
+               // Failed checkpoint
+               if (failed != null) {
+                       gen.writeObjectFieldStart("failed");
+                       writeCheckpoint(gen, failed);
+
+                       gen.writeNumberField("failure_timestamp", 
failed.getFailureTimestamp());
+                       String failureMsg = failed.getFailureMessage();
+                       if (failureMsg != null) {
+                               gen.writeStringField("failure_message", 
failureMsg);
+                       }
+                       gen.writeEndObject();
+               }
+
+               // Restored checkpoint
+               if (restored != null) {
+                       gen.writeObjectFieldStart("restored");
+                       gen.writeNumberField("id", restored.getCheckpointId());
+                       gen.writeNumberField("restore_timestamp", 
restored.getRestoreTimestamp());
+                       gen.writeBooleanField("is_savepoint", 
restored.getProperties().isSavepoint());
+
+                       String externalPath = restored.getExternalPath();
+                       if (externalPath != null) {
+                               gen.writeStringField("external_path", 
externalPath);
+                       }
+                       gen.writeEndObject();
+               }
+               gen.writeEndObject();
+       }
+
+       private static void writeCheckpoint(JsonGenerator gen, 
AbstractCheckpointStats checkpoint) throws IOException {
+               gen.writeNumberField("id", checkpoint.getCheckpointId());
+               gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
+               gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
+               gen.writeNumberField("state_size", checkpoint.getStateSize());
+               gen.writeNumberField("end_to_end_duration", 
checkpoint.getEndToEndDuration());
+               gen.writeNumberField("alignment_buffered", 
checkpoint.getAlignmentBuffered());
+
+       }
+
+       private static void writeHistory(JsonGenerator gen, 
CheckpointStatsHistory history) throws IOException {
+               gen.writeArrayFieldStart("history");
+               for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                       gen.writeStartObject();
+                       gen.writeNumberField("id", 
checkpoint.getCheckpointId());
+                       gen.writeStringField("status", 
checkpoint.getStatus().toString());
+                       gen.writeBooleanField("is_savepoint", 
checkpoint.getProperties().isSavepoint());
+                       gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
+                       gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
+                       gen.writeNumberField("state_size", 
checkpoint.getStateSize());
+                       gen.writeNumberField("end_to_end_duration", 
checkpoint.getEndToEndDuration());
+                       gen.writeNumberField("alignment_buffered", 
checkpoint.getAlignmentBuffered());
+                       gen.writeNumberField("num_subtasks", 
checkpoint.getNumberOfSubtasks());
+                       gen.writeNumberField("num_acknowledged_subtasks", 
checkpoint.getNumberOfAcknowledgedSubtasks());
+
+                       if (checkpoint.getStatus().isCompleted()) {
+                               // --- Completed ---
+                               CompletedCheckpointStats completed = 
(CompletedCheckpointStats) checkpoint;
+
+                               String externalPath = 
completed.getExternalPath();
+                               if (externalPath != null) {
+                                       gen.writeStringField("external_path", 
externalPath);
+                               }
+
+                               gen.writeBooleanField("discarded", 
completed.isDiscarded());
+                       }
+                       else if (checkpoint.getStatus().isFailed()) {
+                               // --- Failed ---
+                               FailedCheckpointStats failed = 
(FailedCheckpointStats) checkpoint;
+
+                               gen.writeNumberField("failure_timestamp", 
failed.getFailureTimestamp());
+
+                               String failureMsg = failed.getFailureMessage();
+                               if (failureMsg != null) {
+                                       gen.writeStringField("failure_message", 
failureMsg);
+                               }
+                       }
+
+                       gen.writeEndObject();
+               }
+               gen.writeEndArray();
+       }
+}

Reply via email to