http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
new file mode 100644
index 0000000..3783b84
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import 
org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.util.TestLogger;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SubtasksTimesHandler.
+ */
+public class SubtasksTimesHandlerTest extends TestLogger {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtasksTimesHandler.SubtasksTimesJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", 
archive.getPath());
+               compareSubtaskTimes(originalTask, originalAttempt, 
archive.getJson());
+       }
+
+       @Test
+       public void testGetPaths() {
+               SubtasksTimesHandler handler = new 
SubtasksTimesHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
+       }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+               String json = 
SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
+
+               compareSubtaskTimes(originalTask, originalAttempt, json);
+       }
+
+       private static void compareSubtaskTimes(AccessExecutionJobVertex 
originalTask, AccessExecution originalAttempt, String json) throws IOException {
+               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
+               Assert.assertTrue(result.get("now").asLong() > 0L);
+
+               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+               JsonNode subtask = subtasks.get(0);
+               Assert.assertEquals(0, subtask.get("subtask").asInt());
+               
Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(),
 subtask.get("host").asText());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState())
 - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), 
subtask.get("duration").asLong());
+
+               JsonNode timestamps = subtask.get("timestamps");
+
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), 
timestamps.get(ExecutionState.CREATED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED),
 timestamps.get(ExecutionState.SCHEDULED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING),
 timestamps.get(ExecutionState.DEPLOYING.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), 
timestamps.get(ExecutionState.RUNNING.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), 
timestamps.get(ExecutionState.FINISHED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING),
 timestamps.get(ExecutionState.CANCELING.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), 
timestamps.get(ExecutionState.CANCELED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), 
timestamps.get(ExecutionState.FAILED.name()).asLong());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
new file mode 100644
index 0000000..b65dcb6
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandlerTest.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobManagerGateway;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.isA;
+import static org.powermock.api.mockito.PowerMockito.mock;
+import static org.powermock.api.mockito.PowerMockito.when;
+
+/**
+ * Tests for the TaskManagersLogHandler.
+ */
+public class TaskManagerLogHandlerTest {
+       @Test
+       public void testGetPaths() {
+               TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
+                       mock(GatewayRetriever.class),
+                       Executors.directExecutor(),
+                       CompletableFuture.completedFuture("/jm/address"),
+                       TestingUtils.TIMEOUT(),
+                       TaskManagerLogHandler.FileMode.LOG,
+                       new Configuration(),
+                       new VoidBlobStore());
+               String[] pathsLog = handlerLog.getPaths();
+               Assert.assertEquals(1, pathsLog.length);
+               Assert.assertEquals("/taskmanagers/:taskmanagerid/log", 
pathsLog[0]);
+
+               TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
+                       mock(GatewayRetriever.class),
+                       Executors.directExecutor(),
+                       CompletableFuture.completedFuture("/jm/address"),
+                       TestingUtils.TIMEOUT(),
+                       TaskManagerLogHandler.FileMode.STDOUT,
+                       new Configuration(),
+                       new VoidBlobStore());
+               String[] pathsOut = handlerOut.getPaths();
+               Assert.assertEquals(1, pathsOut.length);
+               Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", 
pathsOut[0]);
+       }
+
+       @Test
+       public void testLogFetchingFailure() throws Exception {
+               // ========= setup TaskManager 
=================================================================================
+               InstanceID tmID = new InstanceID();
+               ResourceID tmRID = new ResourceID(tmID.toString());
+               TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
+               when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
+
+               Instance taskManager = mock(Instance.class);
+               when(taskManager.getId()).thenReturn(tmID);
+               when(taskManager.getTaskManagerID()).thenReturn(tmRID);
+               
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
+               CompletableFuture<BlobKey> future = new CompletableFuture<>();
+               future.completeExceptionally(new IOException("failure"));
+               
when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
+
+               // ========= setup JobManager 
==================================================================================
+
+               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
+               
when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
+               when(jobManagerGateway.getHostname()).thenReturn("localhost");
+               
when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), 
any(Time.class))).thenReturn(
+                       
CompletableFuture.completedFuture(Optional.of(taskManager)));
+
+               GatewayRetriever<JobManagerGateway> retriever = 
mock(GatewayRetriever.class);
+               when(retriever.getNow())
+                       .thenReturn(Optional.of(jobManagerGateway));
+
+               TaskManagerLogHandler handler = new TaskManagerLogHandler(
+                       retriever,
+                       Executors.directExecutor(),
+                       CompletableFuture.completedFuture("/jm/address"),
+                       TestingUtils.TIMEOUT(),
+                       TaskManagerLogHandler.FileMode.LOG,
+                       new Configuration(),
+                       new VoidBlobStore());
+
+               final AtomicReference<String> exception = new 
AtomicReference<>();
+
+               ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+               when(ctx.write(isA(ByteBuf.class))).thenAnswer(new 
Answer<Object>() {
+                       @Override
+                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
+                               ByteBuf data = 
invocationOnMock.getArgumentAt(0, ByteBuf.class);
+                               exception.set(new String(data.array(), 
ConfigConstants.DEFAULT_CHARSET));
+                               return null;
+                       }
+               });
+
+               Map<String, String> pathParams = new HashMap<>();
+               pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, 
tmID.toString());
+               Routed routed = mock(Routed.class);
+               when(routed.pathParams()).thenReturn(pathParams);
+               when(routed.request()).thenReturn(new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + 
tmID + "/log"));
+
+               handler.respondAsLeader(ctx, routed, jobManagerGateway);
+
+               Assert.assertEquals("Fetching TaskManager log failed.", 
exception.get());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
new file mode 100644
index 0000000..2f8afd1
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagersHandlerTest.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.concurrent.Executors;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+
+/**
+ * Tests for the TaskManagersHandler.
+ */
+public class TaskManagersHandlerTest {
+       @Test
+       public void testGetPaths() {
+               TaskManagersHandler handler = new 
TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null);
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(2, paths.length);
+               List<String> pathsList = Lists.newArrayList(paths);
+               Assert.assertTrue(pathsList.contains("/taskmanagers"));
+               
Assert.assertTrue(pathsList.contains("/taskmanagers/:taskmanagerid"));
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
new file mode 100644
index 0000000..e2289f0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerITCase.java
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.MemoryType;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.duration.FiniteDuration;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+
+/**
+ * Simple back pressured task test.
+ */
+public class BackPressureStatsTrackerITCase extends TestLogger {
+
+       private static NetworkBufferPool networkBufferPool;
+       private static ActorSystem testActorSystem;
+
+       /** Shared as static variable with the test task. */
+       private static BufferPool testBufferPool;
+
+       @BeforeClass
+       public static void setup() {
+               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+               networkBufferPool = new NetworkBufferPool(100, 8192, 
MemoryType.HEAP);
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(testActorSystem);
+               networkBufferPool.destroy();
+       }
+
+       /**
+        * Tests a simple fake-back pressured task. Back pressure is assumed 
when
+        * sampled stack traces are in blocking buffer requests.
+        */
+       @Test
+       public void testBackPressuredProducer() throws Exception {
+               new JavaTestKit(testActorSystem) {{
+                       final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
+
+                       // The JobGraph
+                       final JobGraph jobGraph = new JobGraph();
+                       final int parallelism = 4;
+
+                       final JobVertex task = new JobVertex("Task");
+                       task.setInvokableClass(BackPressuredTask.class);
+                       task.setParallelism(parallelism);
+
+                       jobGraph.addVertex(task);
+
+                       final Configuration config = new Configuration();
+
+                       final HighAvailabilityServices highAvailabilityServices 
= HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+                               config,
+                               TestingUtils.defaultExecutor());
+
+                       ActorGateway jobManger = null;
+                       ActorGateway taskManager = null;
+
+                       //
+                       // 1) Consume all buffers at first (no buffers for the 
test task)
+                       //
+                       testBufferPool = networkBufferPool.createBufferPool(1, 
Integer.MAX_VALUE);
+                       final List<Buffer> buffers = new ArrayList<>();
+                       while (true) {
+                               Buffer buffer = testBufferPool.requestBuffer();
+                               if (buffer != null) {
+                                       buffers.add(buffer);
+                               } else {
+                                       break;
+                               }
+                       }
+
+                       try {
+                               jobManger = TestingUtils.createJobManager(
+                                       testActorSystem,
+                                       TestingUtils.defaultExecutor(),
+                                       TestingUtils.defaultExecutor(),
+                                       config,
+                                       highAvailabilityServices);
+
+                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+                               taskManager = TestingUtils.createTaskManager(
+                                       testActorSystem,
+                                       highAvailabilityServices,
+                                       config,
+                                       true,
+                                       true);
+
+                               final ActorGateway jm = jobManger;
+
+                               new Within(deadline) {
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                                                       // Submit the job and 
wait until it is running
+                                                       
JobClient.submitJobDetached(
+                                                                       new 
AkkaJobManagerGateway(jm),
+                                                                       config,
+                                                                       
jobGraph,
+                                                                       
Time.milliseconds(deadline.toMillis()),
+                                                                       
ClassLoader.getSystemClassLoader());
+
+                                                       jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+                                                       expectMsgEquals(new 
AllVerticesRunning(jobGraph.getJobID()));
+
+                                                       // Get the 
ExecutionGraph
+                                                       jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
+
+                                                       ExecutionGraphFound 
executionGraphResponse =
+                                                                       
expectMsgClass(ExecutionGraphFound.class);
+
+                                                       ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
+                                                       ExecutionJobVertex 
vertex = executionGraph.getJobVertex(task.getID());
+
+                                                       
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+                                                                       
testActorSystem.dispatcher(), 60000);
+
+                                                       // Verify back pressure 
(clean up interval can be ignored)
+                                                       
BackPressureStatsTracker statsTracker = new BackPressureStatsTracker(
+                                                               coordinator,
+                                                               100 * 1000,
+                                                               20,
+                                                               
Time.milliseconds(10L));
+
+                                                       int numAttempts = 10;
+
+                                                       int nextSampleId = 0;
+
+                                                       // Verify that all 
tasks are back pressured. This
+                                                       // can fail if the task 
takes longer to request
+                                                       // the buffer.
+                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
+                                                               try {
+                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+                                                                       
Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
+                                                                       
Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
+                                                                       
Assert.assertEquals(1.0, stats.getMaxBackPressureRatio(), 0.0);
+
+                                                                       for 
(int i = 0; i < parallelism; i++) {
+                                                                               
Assert.assertEquals(1.0, stats.getBackPressureRatio(i), 0.0);
+                                                                       }
+
+                                                                       
nextSampleId = stats.getSampleId() + 1;
+
+                                                                       break;
+                                                               } catch 
(Throwable t) {
+                                                                       if 
(attempt == numAttempts - 1) {
+                                                                               
throw t;
+                                                                       } else {
+                                                                               
Thread.sleep(500);
+                                                                       }
+                                                               }
+                                                       }
+
+                                                       //
+                                                       // 2) Release all 
buffers and let the tasks grab one
+                                                       //
+                                                       for (Buffer buf : 
buffers) {
+                                                               buf.recycle();
+                                                       }
+
+                                                       // Wait for all buffers 
to be available. The tasks
+                                                       // grab them and then 
immediately release them.
+                                                       while 
(testBufferPool.getNumberOfAvailableMemorySegments() < 100) {
+                                                               
Thread.sleep(100);
+                                                       }
+
+                                                       // Verify that no task 
is back pressured any more.
+                                                       for (int attempt = 0; 
attempt < numAttempts; attempt++) {
+                                                               try {
+                                                                       
OperatorBackPressureStats stats = triggerStatsSample(statsTracker, vertex);
+
+                                                                       
Assert.assertEquals(nextSampleId + attempt, stats.getSampleId());
+                                                                       
Assert.assertEquals(parallelism, stats.getNumberOfSubTasks());
+
+                                                                       // 
Verify that no task is back pressured
+                                                                       for 
(int i = 0; i < parallelism; i++) {
+                                                                               
Assert.assertEquals(0.0, stats.getBackPressureRatio(i), 0.0);
+                                                                       }
+
+                                                                       break;
+                                                               } catch 
(Throwable t) {
+                                                                       if 
(attempt == numAttempts - 1) {
+                                                                               
throw t;
+                                                                       } else {
+                                                                               
Thread.sleep(500);
+                                                                       }
+                                                               }
+                                                       }
+
+                                                       // Shut down
+                                                       jm.tell(new 
TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), testActor);
+
+                                                       // Cancel job
+                                                       jm.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+                                                       // Response to removal 
notification
+                                                       expectMsgEquals(true);
+
+                                                       //
+                                                       // 3) Trigger stats for 
archived job
+                                                       //
+                                                       
statsTracker.invalidateOperatorStatsCache();
+                                                       
Assert.assertFalse("Unexpected trigger", 
statsTracker.triggerStackTraceSample(vertex));
+
+                                               } catch (Exception e) {
+                                                       e.printStackTrace();
+                                                       
Assert.fail(e.getMessage());
+                                               }
+                                       }
+                               };
+                       } finally {
+                               TestingUtils.stopActor(jobManger);
+                               TestingUtils.stopActor(taskManager);
+
+                               
highAvailabilityServices.closeAndCleanupAllData();
+
+                               for (Buffer buf : buffers) {
+                                       buf.recycle();
+                               }
+
+                               testBufferPool.lazyDestroy();
+                       }
+               }};
+       }
+
+       /**
+        * Triggers a new stats sample.
+        */
+       private OperatorBackPressureStats triggerStatsSample(
+                       BackPressureStatsTracker statsTracker,
+                       ExecutionJobVertex vertex) throws InterruptedException {
+
+               statsTracker.invalidateOperatorStatsCache();
+               Assert.assertTrue("Failed to trigger", 
statsTracker.triggerStackTraceSample(vertex));
+
+               // Sleep minimum duration
+               Thread.sleep(20 * 10);
+
+               Optional<OperatorBackPressureStats> stats;
+
+               // Get the stats
+               while (!(stats = 
statsTracker.getOperatorBackPressureStats(vertex)).isPresent()) {
+                       Thread.sleep(10);
+               }
+
+               return stats.get();
+       }
+
+       /**
+        * A back pressured producer sharing a {@link BufferPool} with the
+        * test driver.
+        */
+       public static class BackPressuredTask extends AbstractInvokable {
+
+               @Override
+               public void invoke() throws Exception {
+                       while (true) {
+                               Buffer buffer = 
testBufferPool.requestBufferBlocking();
+                               // Got a buffer, yay!
+                               buffer.recycle();
+
+                               new CountDownLatch(1).await();
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
new file mode 100644
index 0000000..02f954a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Tests for the BackPressureStatsTracker.
+ */
+public class BackPressureStatsTrackerTest extends TestLogger {
+
+       /** Tests simple statistics with fake stack traces. */
+       @Test
+       @SuppressWarnings("unchecked")
+       public void testTriggerStackTraceSample() throws Exception {
+               CompletableFuture<StackTraceSample> sampleFuture = new 
CompletableFuture<>();
+
+               StackTraceSampleCoordinator sampleCoordinator = 
Mockito.mock(StackTraceSampleCoordinator.class);
+               Mockito.when(sampleCoordinator.triggerStackTraceSample(
+                               Matchers.any(ExecutionVertex[].class),
+                               Matchers.anyInt(),
+                               Matchers.any(Time.class),
+                               Matchers.anyInt())).thenReturn(sampleFuture);
+
+               ExecutionGraph graph = Mockito.mock(ExecutionGraph.class);
+               Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING);
+
+               // Same Thread execution context
+               Mockito.when(graph.getFutureExecutor()).thenReturn(new 
Executor() {
+
+                       @Override
+                       public void execute(Runnable runnable) {
+                               runnable.run();
+                       }
+               });
+
+               ExecutionVertex[] taskVertices = new ExecutionVertex[4];
+
+               ExecutionJobVertex jobVertex = 
Mockito.mock(ExecutionJobVertex.class);
+               Mockito.when(jobVertex.getJobId()).thenReturn(new JobID());
+               Mockito.when(jobVertex.getJobVertexId()).thenReturn(new 
JobVertexID());
+               Mockito.when(jobVertex.getGraph()).thenReturn(graph);
+               
Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices);
+
+               taskVertices[0] = mockExecutionVertex(jobVertex, 0);
+               taskVertices[1] = mockExecutionVertex(jobVertex, 1);
+               taskVertices[2] = mockExecutionVertex(jobVertex, 2);
+               taskVertices[3] = mockExecutionVertex(jobVertex, 3);
+
+               int numSamples = 100;
+               Time delayBetweenSamples = Time.milliseconds(100L);
+
+               BackPressureStatsTracker tracker = new BackPressureStatsTracker(
+                               sampleCoordinator, 9999, numSamples, 
delayBetweenSamples);
+
+               // Trigger
+               Assert.assertTrue("Failed to trigger", 
tracker.triggerStackTraceSample(jobVertex));
+
+               Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+                               Matchers.eq(taskVertices),
+                               Matchers.eq(numSamples),
+                               Matchers.eq(delayBetweenSamples),
+                               
Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+               // Trigger again for pending request, should not fire
+               Assert.assertFalse("Unexpected trigger", 
tracker.triggerStackTraceSample(jobVertex));
+
+               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               Mockito.verify(sampleCoordinator).triggerStackTraceSample(
+                               Matchers.eq(taskVertices),
+                               Matchers.eq(numSamples),
+                               Matchers.eq(delayBetweenSamples),
+                               
Matchers.eq(BackPressureStatsTracker.MAX_STACK_TRACE_DEPTH));
+
+               
Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               // Complete the future
+               Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new 
HashMap<>();
+               for (ExecutionVertex vertex : taskVertices) {
+                       List<StackTraceElement[]> taskTraces = new 
ArrayList<>();
+
+                       for (int i = 0; i < taskVertices.length; i++) {
+                               // Traces until sub task index are back 
pressured
+                               taskTraces.add(createStackTrace(i <= 
vertex.getParallelSubtaskIndex()));
+                       }
+
+                       
traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces);
+               }
+
+               int sampleId = 1231;
+               int endTime = 841;
+
+               StackTraceSample sample = new StackTraceSample(
+                               sampleId,
+                               0,
+                               endTime,
+                               traces);
+
+               // Succeed the promise
+               sampleFuture.complete(sample);
+
+               
Assert.assertTrue(tracker.getOperatorBackPressureStats(jobVertex).isPresent());
+
+               OperatorBackPressureStats stats = 
tracker.getOperatorBackPressureStats(jobVertex).get();
+
+               // Verify the stats
+               Assert.assertEquals(sampleId, stats.getSampleId());
+               Assert.assertEquals(endTime, stats.getEndTimestamp());
+               Assert.assertEquals(taskVertices.length, 
stats.getNumberOfSubTasks());
+
+               for (int i = 0; i < taskVertices.length; i++) {
+                       double ratio = stats.getBackPressureRatio(i);
+                       // Traces until sub task index are back pressured
+                       Assert.assertEquals((i + 1) / ((double) 4), ratio, 0.0);
+               }
+       }
+
+       private StackTraceElement[] createStackTrace(boolean isBackPressure) {
+               if (isBackPressure) {
+                       return new StackTraceElement[] { new StackTraceElement(
+                                       
BackPressureStatsTracker.EXPECTED_CLASS_NAME,
+                                       
BackPressureStatsTracker.EXPECTED_METHOD_NAME,
+                                       "LocalBufferPool.java",
+                                       133) };
+               } else {
+                       return Thread.currentThread().getStackTrace();
+               }
+       }
+
+       private ExecutionVertex mockExecutionVertex(
+                       ExecutionJobVertex jobVertex,
+                       int subTaskIndex) {
+
+               Execution exec = Mockito.mock(Execution.class);
+               Mockito.when(exec.getAttemptId()).thenReturn(new 
ExecutionAttemptID());
+
+               JobVertexID id = jobVertex.getJobVertexId();
+
+               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+               Mockito.when(vertex.getJobvertexId()).thenReturn(id);
+               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+               
Mockito.when(vertex.getParallelSubtaskIndex()).thenReturn(subTaskIndex);
+
+               return vertex;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
new file mode 100644
index 0000000..8fa302a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorITCase.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaJobManagerGateway;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.client.JobClient;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.BlockingNoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import akka.testkit.JavaTestKit;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.AllVerticesRunning;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.ExecutionGraphFound;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.RequestExecutionGraph;
+import static 
org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+
+/**
+ * Simple stack trace sampling test.
+ */
+public class StackTraceSampleCoordinatorITCase extends TestLogger {
+
+       private static ActorSystem testActorSystem;
+
+       @BeforeClass
+       public static void setup() {
+               testActorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+       }
+
+       @AfterClass
+       public static void teardown() {
+               JavaTestKit.shutdownActorSystem(testActorSystem);
+       }
+
+       /**
+        * Tests that a cleared task is answered with a partial success 
response.
+        */
+       @Test
+       public void testTaskClearedWhileSampling() throws Exception {
+               new JavaTestKit(testActorSystem) {{
+                       final FiniteDuration deadline = new FiniteDuration(60, 
TimeUnit.SECONDS);
+
+                       // The JobGraph
+                       final JobGraph jobGraph = new JobGraph();
+                       final int parallelism = 1;
+
+                       final JobVertex task = new JobVertex("Task");
+                       task.setInvokableClass(BlockingNoOpInvokable.class);
+                       task.setParallelism(parallelism);
+
+                       jobGraph.addVertex(task);
+
+                       final Configuration config = new Configuration();
+
+                       final HighAvailabilityServices highAvailabilityServices 
= HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+                               config,
+                               TestingUtils.defaultExecutor());
+
+                       ActorGateway jobManger = null;
+                       ActorGateway taskManager = null;
+
+                       try {
+                               jobManger = TestingUtils.createJobManager(
+                                       testActorSystem,
+                                       TestingUtils.defaultExecutor(),
+                                       TestingUtils.defaultExecutor(),
+                                       config,
+                                       highAvailabilityServices);
+
+                               
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, parallelism);
+
+                               taskManager = TestingUtils.createTaskManager(
+                                       testActorSystem,
+                                       highAvailabilityServices,
+                                       config,
+                                       true,
+                                       true);
+
+                               final ActorGateway jm = jobManger;
+
+                               new Within(deadline) {
+                                       @Override
+                                       protected void run() {
+                                               try {
+                                                       ActorGateway testActor 
= new AkkaActorGateway(getTestActor(), 
HighAvailabilityServices.DEFAULT_LEADER_ID);
+
+                                                       int maxAttempts = 10;
+                                                       int sleepTime = 100;
+
+                                                       for (int i = 0; i < 
maxAttempts; i++, sleepTime *= 2) {
+                                                               // Submit the 
job and wait until it is running
+                                                               
JobClient.submitJobDetached(
+                                                                               
new AkkaJobManagerGateway(jm),
+                                                                               
config,
+                                                                               
jobGraph,
+                                                                               
Time.milliseconds(deadline.toMillis()),
+                                                                               
ClassLoader.getSystemClassLoader());
+
+                                                               jm.tell(new 
WaitForAllVerticesToBeRunning(jobGraph.getJobID()), testActor);
+
+                                                               
expectMsgEquals(new AllVerticesRunning(jobGraph.getJobID()));
+
+                                                               // Get the 
ExecutionGraph
+                                                               jm.tell(new 
RequestExecutionGraph(jobGraph.getJobID()), testActor);
+                                                               
ExecutionGraphFound executionGraphResponse =
+                                                                               
expectMsgClass(ExecutionGraphFound.class);
+                                                               ExecutionGraph 
executionGraph = (ExecutionGraph) executionGraphResponse.executionGraph();
+                                                               
ExecutionJobVertex vertex = executionGraph.getJobVertex(task.getID());
+
+                                                               
StackTraceSampleCoordinator coordinator = new StackTraceSampleCoordinator(
+                                                                               
testActorSystem.dispatcher(), 60000);
+
+                                                               
CompletableFuture<StackTraceSample> sampleFuture = 
coordinator.triggerStackTraceSample(
+                                                                       
vertex.getTaskVertices(),
+                                                                       // Do 
this often so we have a good
+                                                                       // 
chance of removing the job during
+                                                                       // 
sampling.
+                                                                       
21474700 * 100,
+                                                                       
Time.milliseconds(10L),
+                                                                       0);
+
+                                                               // Wait before 
cancelling so that some samples
+                                                               // are actually 
taken.
+                                                               
Thread.sleep(sleepTime);
+
+                                                               // Cancel job
+                                                               Future<?> 
removeFuture = jm.ask(
+                                                                               
new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()),
+                                                                               
remaining());
+
+                                                               jm.tell(new 
JobManagerMessages.CancelJob(jobGraph.getJobID()));
+
+                                                               try {
+                                                                       // 
Throws Exception on failure
+                                                                       
sampleFuture.get(remaining().toMillis(), TimeUnit.MILLISECONDS);
+
+                                                                       // OK, 
we are done. Got the expected
+                                                                       // 
partial result.
+                                                                       break;
+                                                               } catch 
(Throwable t) {
+                                                                       // We 
were too fast in cancelling the job.
+                                                                       // Fall 
through and retry.
+                                                               } finally {
+                                                                       
Await.ready(removeFuture, remaining());
+                                                               }
+                                                       }
+                                               } catch (Exception e) {
+                                                       e.printStackTrace();
+                                                       
Assert.fail(e.getMessage());
+                                               }
+                                       }
+                               };
+                       } finally {
+                               TestingUtils.stopActor(jobManger);
+                               TestingUtils.stopActor(taskManager);
+
+                               
highAvailabilityServices.closeAndCleanupAllData();
+                       }
+               }};
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
new file mode 100644
index 0000000..786b0ae
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/StackTraceSampleCoordinatorTest.java
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.backpressure;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.TriggerStackTraceSample;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorSystem;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Test for the {@link StackTraceSampleCoordinator}.
+ */
+public class StackTraceSampleCoordinatorTest extends TestLogger {
+
+       private static ActorSystem system;
+
+       private StackTraceSampleCoordinator coord;
+
+       @BeforeClass
+       public static void setUp() throws Exception {
+               system = AkkaUtils.createLocalActorSystem(new Configuration());
+       }
+
+       @AfterClass
+       public static void tearDown() throws Exception {
+               if (system != null) {
+                       system.shutdown();
+               }
+       }
+
+       @Before
+       public void init() throws Exception {
+               this.coord = new 
StackTraceSampleCoordinator(system.dispatcher(), 60000);
+       }
+
+       /** Tests simple trigger and collect of stack trace samples. */
+       @Test
+       public void testTriggerStackTraceSample() throws Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true)
+               };
+
+               int numSamples = 1;
+               Time delayBetweenSamples = Time.milliseconds(100L);
+               int maxStackTraceDepth = 0;
+
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                               vertices, numSamples, delayBetweenSamples, 
maxStackTraceDepth);
+
+               // Verify messages have been sent
+               for (ExecutionVertex vertex : vertices) {
+                       ExecutionAttemptID expectedExecutionId = vertex
+                                       
.getCurrentExecutionAttempt().getAttemptId();
+
+                       TriggerStackTraceSample expectedMsg = new 
TriggerStackTraceSample(
+                                       0,
+                                       expectedExecutionId,
+                                       numSamples,
+                                       delayBetweenSamples,
+                                       maxStackTraceDepth);
+
+                       Mockito.verify(vertex.getCurrentExecutionAttempt())
+                               .requestStackTraceSample(Matchers.eq(0), 
Matchers.eq(numSamples), Matchers.eq(delayBetweenSamples), 
Matchers.eq(maxStackTraceDepth), Matchers.any(Time.class));
+               }
+
+               Assert.assertFalse(sampleFuture.isDone());
+
+               StackTraceElement[] stackTraceSample = 
Thread.currentThread().getStackTrace();
+               List<StackTraceElement[]> traces = new ArrayList<>();
+               traces.add(stackTraceSample);
+               traces.add(stackTraceSample);
+               traces.add(stackTraceSample);
+
+               // Collect stack traces
+               for (int i = 0; i < vertices.length; i++) {
+                       ExecutionAttemptID executionId = 
vertices[i].getCurrentExecutionAttempt().getAttemptId();
+                       coord.collectStackTraces(0, executionId, traces);
+
+                       if (i == vertices.length - 1) {
+                               Assert.assertTrue(sampleFuture.isDone());
+                       } else {
+                               Assert.assertFalse(sampleFuture.isDone());
+                       }
+               }
+
+               // Verify completed stack trace sample
+               StackTraceSample sample = sampleFuture.get();
+
+               Assert.assertEquals(0, sample.getSampleId());
+               Assert.assertTrue(sample.getEndTime() >= sample.getStartTime());
+
+               Map<ExecutionAttemptID, List<StackTraceElement[]>> tracesByTask 
= sample.getStackTraces();
+
+               for (ExecutionVertex vertex : vertices) {
+                       ExecutionAttemptID executionId = 
vertex.getCurrentExecutionAttempt().getAttemptId();
+                       List<StackTraceElement[]> sampleTraces = 
tracesByTask.get(executionId);
+
+                       Assert.assertNotNull("Task not found", sampleTraces);
+                       Assert.assertTrue(traces.equals(sampleTraces));
+               }
+
+               // Verify no more pending sample
+               Assert.assertEquals(0, coord.getNumberOfPendingSamples());
+
+               // Verify no error on late collect
+               coord.collectStackTraces(0, 
vertices[0].getCurrentExecutionAttempt().getAttemptId(), traces);
+       }
+
+       /** Tests triggering for non-running tasks fails the future. */
+       @Test
+       public void testTriggerStackTraceSampleNotRunningTasks() throws 
Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.DEPLOYING, true)
+               };
+
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                       vertices,
+                       1,
+                       Time.milliseconds(100L),
+                       0);
+
+               Assert.assertTrue(sampleFuture.isDone());
+
+               try {
+                       sampleFuture.get();
+                       Assert.fail("Expected exception.");
+               } catch (ExecutionException e) {
+                       Assert.assertTrue(e.getCause() instanceof 
IllegalStateException);
+               }
+       }
+
+       /** Tests triggering for reset tasks fails the future. */
+       @Test(timeout = 1000L)
+       public void testTriggerStackTraceSampleResetRunningTasks() throws 
Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+                               // Fails to send the message to the execution 
(happens when execution is reset)
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, false)
+               };
+
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                       vertices,
+                       1,
+                       Time.milliseconds(100L),
+                       0);
+
+               try {
+                       sampleFuture.get();
+                       Assert.fail("Expected exception.");
+               } catch (ExecutionException e) {
+                       Assert.assertTrue(e.getCause() instanceof 
RuntimeException);
+               }
+       }
+
+       /** Tests that samples time out if they don't finish in time. */
+       @Test(timeout = 1000L)
+       public void testTriggerStackTraceSampleTimeout() throws Exception {
+               int timeout = 100;
+
+               coord = new StackTraceSampleCoordinator(system.dispatcher(), 
timeout);
+
+               final ScheduledExecutorService scheduledExecutorService = new 
ScheduledThreadPoolExecutor(1);
+
+               try {
+
+                       ExecutionVertex[] vertices = new ExecutionVertex[]{
+                               mockExecutionVertexWithTimeout(
+                                       new ExecutionAttemptID(),
+                                       ExecutionState.RUNNING,
+                                       scheduledExecutorService,
+                                       timeout)
+                       };
+
+                       CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0);
+
+                       // Wait for the timeout
+                       Thread.sleep(timeout * 2);
+
+                       boolean success = false;
+                       for (int i = 0; i < 10; i++) {
+                               if (sampleFuture.isDone()) {
+                                       success = true;
+                                       break;
+                               }
+
+                               Thread.sleep(timeout);
+                       }
+
+                       Assert.assertTrue("Sample did not time out", success);
+
+                       try {
+                               sampleFuture.get();
+                               Assert.fail("Expected exception.");
+                       } catch (ExecutionException e) {
+                               
Assert.assertTrue(e.getCause().getCause().getMessage().contains("Timeout"));
+                       }
+
+                       // Collect after the timeout (should be ignored)
+                       ExecutionAttemptID executionId = 
vertices[0].getCurrentExecutionAttempt().getAttemptId();
+                       coord.collectStackTraces(0, executionId, new 
ArrayList<StackTraceElement[]>());
+               } finally {
+                       scheduledExecutorService.shutdownNow();
+               }
+       }
+
+       /** Tests that collecting an unknown sample is ignored. */
+       @Test
+       public void testCollectStackTraceForUnknownSample() throws Exception {
+               coord.collectStackTraces(0, new ExecutionAttemptID(), new 
ArrayList<StackTraceElement[]>());
+       }
+
+       /** Tests cancelling of a pending sample. */
+       @Test
+       public void testCancelStackTraceSample() throws Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+               };
+
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0);
+
+               Assert.assertFalse(sampleFuture.isDone());
+
+               // Cancel
+               coord.cancelStackTraceSample(0, null);
+
+               // Verify completed
+               Assert.assertTrue(sampleFuture.isDone());
+
+               // Verify no more pending samples
+               Assert.assertEquals(0, coord.getNumberOfPendingSamples());
+       }
+
+       /** Tests that collecting for a cancelled sample throws no Exception. */
+       @Test
+       public void testCollectStackTraceForCanceledSample() throws Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+               };
+
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0);
+
+               Assert.assertFalse(sampleFuture.isDone());
+
+               coord.cancelStackTraceSample(0, null);
+
+               Assert.assertTrue(sampleFuture.isDone());
+
+               // Verify no error on late collect
+               ExecutionAttemptID executionId = 
vertices[0].getCurrentExecutionAttempt().getAttemptId();
+               coord.collectStackTraces(0, executionId, new 
ArrayList<StackTraceElement[]>());
+       }
+
+       /** Tests that collecting for a cancelled sample throws no Exception. */
+       @Test
+       public void testCollectForDiscardedPendingSample() throws Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+               };
+
+               CompletableFuture<StackTraceSample> sampleFuture = 
coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0);
+
+               Assert.assertFalse(sampleFuture.isDone());
+
+               coord.cancelStackTraceSample(0, null);
+
+               Assert.assertTrue(sampleFuture.isDone());
+
+               // Verify no error on late collect
+               ExecutionAttemptID executionId = 
vertices[0].getCurrentExecutionAttempt().getAttemptId();
+               coord.collectStackTraces(0, executionId, new 
ArrayList<StackTraceElement[]>());
+       }
+
+
+       /** Tests that collecting for a unknown task fails. */
+       @Test(expected = IllegalArgumentException.class)
+       public void testCollectStackTraceForUnknownTask() throws Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+               };
+
+               coord.triggerStackTraceSample(vertices, 1, 
Time.milliseconds(100L), 0);
+
+               coord.collectStackTraces(0, new ExecutionAttemptID(), new 
ArrayList<StackTraceElement[]>());
+       }
+
+       /** Tests that shut down fails all pending samples and future sample 
triggers. */
+       @Test
+       public void testShutDown() throws Exception {
+               ExecutionVertex[] vertices = new ExecutionVertex[] {
+                               mockExecutionVertex(new ExecutionAttemptID(), 
ExecutionState.RUNNING, true),
+               };
+
+               List<CompletableFuture<StackTraceSample>> sampleFutures = new 
ArrayList<>();
+
+               // Trigger
+               sampleFutures.add(coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0));
+
+               sampleFutures.add(coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0));
+
+               for (CompletableFuture<StackTraceSample> future : 
sampleFutures) {
+                       Assert.assertFalse(future.isDone());
+               }
+
+               // Shut down
+               coord.shutDown();
+
+               // Verify all completed
+               for (CompletableFuture<StackTraceSample> future : 
sampleFutures) {
+                       Assert.assertTrue(future.isDone());
+               }
+
+               // Verify new trigger returns failed future
+               CompletableFuture<StackTraceSample> future = 
coord.triggerStackTraceSample(
+                               vertices, 1, Time.milliseconds(100L), 0);
+
+               Assert.assertTrue(future.isDone());
+
+               try {
+                       future.get();
+                       Assert.fail("Expected exception.");
+               } catch (ExecutionException e) {
+                       // we expected an exception here :-)
+               }
+
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private ExecutionVertex mockExecutionVertex(
+                       ExecutionAttemptID executionId,
+                       ExecutionState state,
+                       boolean sendSuccess) {
+
+               Execution exec = Mockito.mock(Execution.class);
+               CompletableFuture<StackTraceSampleResponse> failedFuture = new 
CompletableFuture<>();
+               failedFuture.completeExceptionally(new Exception("Send 
failed."));
+
+               Mockito.when(exec.getAttemptId()).thenReturn(executionId);
+               Mockito.when(exec.getState()).thenReturn(state);
+               Mockito.when(exec.requestStackTraceSample(Matchers.anyInt(), 
Matchers.anyInt(), Matchers.any(Time.class), Matchers.anyInt(), 
Matchers.any(Time.class)))
+                       .thenReturn(
+                               sendSuccess ?
+                                       
CompletableFuture.completedFuture(Mockito.mock(StackTraceSampleResponse.class)) 
:
+                                       failedFuture);
+
+               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+               Mockito.when(vertex.getJobvertexId()).thenReturn(new 
JobVertexID());
+               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+               return vertex;
+       }
+
+       private ExecutionVertex mockExecutionVertexWithTimeout(
+               ExecutionAttemptID executionId,
+               ExecutionState state,
+               ScheduledExecutorService scheduledExecutorService,
+               int timeout) {
+
+               final CompletableFuture<StackTraceSampleResponse> future = new 
CompletableFuture<>();
+
+               Execution exec = Mockito.mock(Execution.class);
+               Mockito.when(exec.getAttemptId()).thenReturn(executionId);
+               Mockito.when(exec.getState()).thenReturn(state);
+               Mockito.when(exec.requestStackTraceSample(Matchers.anyInt(), 
Matchers.anyInt(), Matchers.any(Time.class), Matchers.anyInt(), 
Matchers.any(Time.class)))
+                       .thenReturn(future);
+
+               scheduledExecutorService.schedule(new Runnable() {
+                       @Override
+                       public void run() {
+                               future.completeExceptionally(new 
TimeoutException("Timeout"));
+                       }
+               }, timeout, TimeUnit.MILLISECONDS);
+
+               ExecutionVertex vertex = Mockito.mock(ExecutionVertex.class);
+               Mockito.when(vertex.getJobvertexId()).thenReturn(new 
JobVertexID());
+               
Mockito.when(vertex.getCurrentExecutionAttempt()).thenReturn(exec);
+
+               return vertex;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
new file mode 100644
index 0000000..db91f58
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
+import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpointConfigHandler.
+ */
+public class CheckpointConfigHandlerTest {
+
+       @Test
+       public void testArchiver() throws IOException {
+               JsonArchivist archivist = new 
CheckpointConfigHandler.CheckpointConfigJsonArchivist();
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(true, true);
+
+               AccessExecutionGraph graph = graphAndSettings.graph;
+               when(graph.getJobID()).thenReturn(new JobID());
+               JobCheckpointingSettings settings = 
graphAndSettings.snapshottingSettings;
+               ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
+               Assert.assertEquals(1, archives.size());
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + graph.getJobID() + 
"/checkpoints/config", archive.getPath());
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(archive.getJson());
+
+               Assert.assertEquals("exactly_once", 
rootNode.get("mode").asText());
+               Assert.assertEquals(settings.getCheckpointInterval(), 
rootNode.get("interval").asLong());
+               Assert.assertEquals(settings.getCheckpointTimeout(), 
rootNode.get("timeout").asLong());
+               Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), 
rootNode.get("min_pause").asLong());
+               Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), 
rootNode.get("max_concurrent").asInt());
+
+               JsonNode externalizedNode = rootNode.get("externalization");
+               Assert.assertNotNull(externalizedNode);
+               
Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
+               
Assert.assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
+
+       }
+
+       @Test
+       public void testGetPaths() {
+               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               Assert.assertEquals("/jobs/:jobid/checkpoints/config", 
paths[0]);
+       }
+
+       /**
+        * Tests a simple config.
+        */
+       @Test
+       public void testSimpleConfig() throws Exception {
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, true);
+
+               AccessExecutionGraph graph = graphAndSettings.graph;
+               JobCheckpointingSettings settings = 
graphAndSettings.snapshottingSettings;
+
+               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(json);
+
+               assertEquals("exactly_once", rootNode.get("mode").asText());
+               assertEquals(settings.getCheckpointInterval(), 
rootNode.get("interval").asLong());
+               assertEquals(settings.getCheckpointTimeout(), 
rootNode.get("timeout").asLong());
+               assertEquals(settings.getMinPauseBetweenCheckpoints(), 
rootNode.get("min_pause").asLong());
+               assertEquals(settings.getMaxConcurrentCheckpoints(), 
rootNode.get("max_concurrent").asInt());
+
+               JsonNode externalizedNode = rootNode.get("externalization");
+               assertNotNull(externalizedNode);
+               assertEquals(false, 
externalizedNode.get("enabled").asBoolean());
+       }
+
+       /**
+        * Tests the that the isExactlyOnce flag is respected.
+        */
+       @Test
+       public void testAtLeastOnce() throws Exception {
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, false);
+
+               AccessExecutionGraph graph = graphAndSettings.graph;
+
+               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(json);
+
+               assertEquals("at_least_once", rootNode.get("mode").asText());
+       }
+
+       /**
+        * Tests that the externalized checkpoint settings are forwarded.
+        */
+       @Test
+       public void testEnabledExternalizedCheckpointSettings() throws 
Exception {
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(true, false);
+
+               AccessExecutionGraph graph = graphAndSettings.graph;
+               ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
+
+               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
+               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode externalizedNode = 
mapper.readTree(json).get("externalization");
+               assertNotNull(externalizedNode);
+               assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
+               assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
+       }
+
+       private static GraphAndSettings createGraphAndSettings(boolean 
externalized, boolean exactlyOnce) {
+               long interval = 18231823L;
+               long timeout = 996979L;
+               long minPause = 119191919L;
+               int maxConcurrent = 12929329;
+               ExternalizedCheckpointSettings externalizedSetting = 
externalized
+                       ? 
ExternalizedCheckpointSettings.externalizeCheckpoints(true)
+                       : ExternalizedCheckpointSettings.none();
+
+               JobCheckpointingSettings settings = new 
JobCheckpointingSettings(
+                       Collections.<JobVertexID>emptyList(),
+                       Collections.<JobVertexID>emptyList(),
+                       Collections.<JobVertexID>emptyList(),
+                       interval,
+                       timeout,
+                       minPause,
+                       maxConcurrent,
+                       externalizedSetting,
+                       null,
+                       exactlyOnce);
+
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               when(graph.getJobCheckpointingSettings()).thenReturn(settings);
+
+               return new GraphAndSettings(graph, settings, 
externalizedSetting);
+       }
+
+       private static class GraphAndSettings {
+               public final AccessExecutionGraph graph;
+               public final JobCheckpointingSettings snapshottingSettings;
+               public final ExternalizedCheckpointSettings 
externalizedSettings;
+
+               public GraphAndSettings(
+                               AccessExecutionGraph graph,
+                               JobCheckpointingSettings snapshottingSettings,
+                               ExternalizedCheckpointSettings 
externalizedSettings) {
+                       this.graph = graph;
+                       this.snapshottingSettings = snapshottingSettings;
+                       this.externalizedSettings = externalizedSettings;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
new file mode 100644
index 0000000..04b1c55
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsCacheTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpoitnStatsCache.
+ */
+public class CheckpointStatsCacheTest {
+
+       @Test
+       public void testZeroSizeCache() throws Exception {
+               AbstractCheckpointStats checkpoint = createCheckpoint(0, 
CheckpointStatsStatus.COMPLETED);
+
+               CheckpointStatsCache cache = new CheckpointStatsCache(0);
+               cache.tryAdd(checkpoint);
+               assertNull(cache.tryGet(0L));
+       }
+
+       @Test
+       public void testCacheAddAndGet() throws Exception {
+               AbstractCheckpointStats chk0 = createCheckpoint(0, 
CheckpointStatsStatus.COMPLETED);
+               AbstractCheckpointStats chk1 = createCheckpoint(1, 
CheckpointStatsStatus.COMPLETED);
+               AbstractCheckpointStats chk2 = createCheckpoint(2, 
CheckpointStatsStatus.IN_PROGRESS);
+
+               CheckpointStatsCache cache = new CheckpointStatsCache(1);
+               cache.tryAdd(chk0);
+               assertEquals(chk0, cache.tryGet(0));
+
+               cache.tryAdd(chk1);
+               assertNull(cache.tryGet(0));
+               assertEquals(chk1, cache.tryGet(1));
+
+               cache.tryAdd(chk2);
+               assertNull(cache.tryGet(2));
+               assertNull(cache.tryGet(0));
+               assertEquals(chk1, cache.tryGet(1));
+       }
+
+       private AbstractCheckpointStats createCheckpoint(long id, 
CheckpointStatsStatus status) {
+               AbstractCheckpointStats checkpoint = 
mock(AbstractCheckpointStats.class);
+               when(checkpoint.getCheckpointId()).thenReturn(id);
+               when(checkpoint.getStatus()).thenReturn(status);
+               return checkpoint;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
new file mode 100644
index 0000000..e614608
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.checkpoints;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointProperties;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
+import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
+import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
+import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for the CheckpointStatsDetailsHandler.
+ */
+public class CheckpointStatsDetailsHandlerTest {
+
+       @Test
+       public void testArchiver() throws IOException {
+               JsonArchivist archivist = new 
CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+
+               CompletedCheckpointStats completedCheckpoint = 
createCompletedCheckpoint();
+               FailedCheckpointStats failedCheckpoint = 
createFailedCheckpoint();
+               List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+               checkpoints.add(failedCheckpoint);
+               checkpoints.add(completedCheckpoint);
+
+               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
+               when(history.getCheckpoints()).thenReturn(checkpoints);
+               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
+               when(snapshot.getHistory()).thenReturn(history);
+
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+               when(graph.getJobID()).thenReturn(new JobID());
+
+               ObjectMapper mapper = new ObjectMapper();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
+               Assert.assertEquals(2, archives.size());
+
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
failedCheckpoint.getCheckpointId(),
+                       archive1.getPath());
+               compareFailedCheckpoint(failedCheckpoint, 
mapper.readTree(archive1.getJson()));
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
completedCheckpoint.getCheckpointId(),
+                       archive2.getPath());
+               compareCompletedCheckpoint(completedCheckpoint, 
mapper.readTree(archive2.getJson()));
+       }
+
+       @Test
+       public void testGetPaths() {
+               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
+               String[] paths = handler.getPaths();
+               Assert.assertEquals(1, paths.length);
+               
Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]);
+       }
+
+       /**
+        * Tests request with illegal checkpoint ID param.
+        */
+       @Test
+       public void testIllegalCheckpointId() throws Exception {
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
+               Map<String, String> params = new HashMap<>();
+               params.put("checkpointid", "illegal checkpoint");
+               String json = handler.handleRequest(graph, params).get();
+
+               assertEquals("{}", json);
+       }
+
+       /**
+        * Tests request with missing checkpoint ID param.
+        */
+       @Test
+       public void testNoCheckpointIdParam() throws Exception {
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
+               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
+
+               assertEquals("{}", json);
+       }
+
+       /**
+        * Test lookup of not existing checkpoint in history.
+        */
+       @Test
+       public void testCheckpointNotFound() throws Exception {
+               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
+               when(history.getCheckpointById(anyLong())).thenReturn(null); // 
not found
+
+               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
+               when(snapshot.getHistory()).thenReturn(history);
+
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
+               Map<String, String> params = new HashMap<>();
+               params.put("checkpointid", "123");
+               String json = handler.handleRequest(graph, params).get();
+
+               assertEquals("{}", json);
+               verify(history, times(1)).getCheckpointById(anyLong());
+       }
+
+       /**
+        * Tests a checkpoint details request for an in progress checkpoint.
+        */
+       @Test
+       public void testCheckpointDetailsRequestInProgressCheckpoint() throws 
Exception {
+               PendingCheckpointStats checkpoint = 
mock(PendingCheckpointStats.class);
+               when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+               
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
+               when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
+               
when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
+               when(checkpoint.getStateSize()).thenReturn(111939272822L);
+               when(checkpoint.getEndToEndDuration()).thenReturn(121191L);
+               when(checkpoint.getAlignmentBuffered()).thenReturn(1L);
+               when(checkpoint.getNumberOfSubtasks()).thenReturn(501);
+               
when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
+
+               List<TaskStateStats> taskStats = new ArrayList<>();
+               TaskStateStats task1 = createTaskStateStats();
+               TaskStateStats task2 = createTaskStateStats();
+               taskStats.add(task1);
+               taskStats.add(task2);
+
+               when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+               JsonNode rootNode = triggerRequest(checkpoint);
+
+               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
+               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
+               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
+               assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
+               assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
+               assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
+               assertEquals(checkpoint.getEndToEndDuration(), 
rootNode.get("end_to_end_duration").asLong());
+               assertEquals(checkpoint.getAlignmentBuffered(), 
rootNode.get("alignment_buffered").asLong());
+               assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
+               assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
+
+               verifyTaskNodes(taskStats, rootNode);
+       }
+
+       /**
+        * Tests a checkpoint details request for a completed checkpoint.
+        */
+       @Test
+       public void testCheckpointDetailsRequestCompletedCheckpoint() throws 
Exception {
+               CompletedCheckpointStats checkpoint = 
createCompletedCheckpoint();
+
+               JsonNode rootNode = triggerRequest(checkpoint);
+
+               compareCompletedCheckpoint(checkpoint, rootNode);
+
+               verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+       }
+
+       /**
+        * Tests a checkpoint details request for a failed checkpoint.
+        */
+       @Test
+       public void testCheckpointDetailsRequestFailedCheckpoint() throws 
Exception {
+               FailedCheckpointStats checkpoint = createFailedCheckpoint();
+
+               JsonNode rootNode = triggerRequest(checkpoint);
+
+               compareFailedCheckpoint(checkpoint, rootNode);
+
+               verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static CompletedCheckpointStats createCompletedCheckpoint() {
+               CompletedCheckpointStats checkpoint = 
mock(CompletedCheckpointStats.class);
+               when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
+               
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+               when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+               when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+               when(checkpoint.getStateSize()).thenReturn(925281L);
+               when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+               when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+               when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+               
when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+               when(checkpoint.isDiscarded()).thenReturn(true);
+               
when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path");
+
+               List<TaskStateStats> taskStats = new ArrayList<>();
+               TaskStateStats task1 = createTaskStateStats();
+               TaskStateStats task2 = createTaskStateStats();
+               taskStats.add(task1);
+               taskStats.add(task2);
+
+               when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+               return checkpoint;
+       }
+
+       private static void compareCompletedCheckpoint(CompletedCheckpointStats 
checkpoint, JsonNode rootNode) {
+               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
+               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
+               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
+               assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
+               assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
+               assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
+               assertEquals(checkpoint.getEndToEndDuration(), 
rootNode.get("end_to_end_duration").asLong());
+               assertEquals(checkpoint.getAlignmentBuffered(), 
rootNode.get("alignment_buffered").asLong());
+               assertEquals(checkpoint.isDiscarded(), 
rootNode.get("discarded").asBoolean());
+               assertEquals(checkpoint.getExternalPath(), 
rootNode.get("external_path").asText());
+               assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
+               assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
+       }
+
+       private static FailedCheckpointStats createFailedCheckpoint() {
+               FailedCheckpointStats checkpoint = 
mock(FailedCheckpointStats.class);
+               when(checkpoint.getCheckpointId()).thenReturn(1818214L);
+               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
+               
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
+               when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
+               when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
+               when(checkpoint.getStateSize()).thenReturn(925281L);
+               when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
+               when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
+               when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
+               
when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
+               
when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L);
+               
when(checkpoint.getFailureMessage()).thenReturn("failure-message");
+
+               List<TaskStateStats> taskStats = new ArrayList<>();
+               TaskStateStats task1 = createTaskStateStats();
+               TaskStateStats task2 = createTaskStateStats();
+               taskStats.add(task1);
+               taskStats.add(task2);
+
+               when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
+
+               return checkpoint;
+       }
+
+       private static void compareFailedCheckpoint(FailedCheckpointStats 
checkpoint, JsonNode rootNode) {
+               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
+               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
+               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
+               assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
+               assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
+               assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
+               assertEquals(checkpoint.getEndToEndDuration(), 
rootNode.get("end_to_end_duration").asLong());
+               assertEquals(checkpoint.getAlignmentBuffered(), 
rootNode.get("alignment_buffered").asLong());
+               assertEquals(checkpoint.getFailureTimestamp(), 
rootNode.get("failure_timestamp").asLong());
+               assertEquals(checkpoint.getFailureMessage(), 
rootNode.get("failure_message").asText());
+               assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
+               assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
+       }
+
+       private static JsonNode triggerRequest(AbstractCheckpointStats 
checkpoint) throws Exception {
+               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
+               
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
+               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
+               when(snapshot.getHistory()).thenReturn(history);
+
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+
+               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
+               Map<String, String> params = new HashMap<>();
+               params.put("checkpointid", "123");
+               String json = handler.handleRequest(graph, params).get();
+
+               ObjectMapper mapper = new ObjectMapper();
+               return mapper.readTree(json);
+       }
+
+       private static void verifyTaskNodes(Collection<TaskStateStats> tasks, 
JsonNode parentNode) {
+               for (TaskStateStats task : tasks) {
+                       long duration = 
ThreadLocalRandom.current().nextInt(128);
+
+                       JsonNode taskNode = 
parentNode.get("tasks").get(task.getJobVertexId().toString());
+                       assertEquals(task.getLatestAckTimestamp(), 
taskNode.get("latest_ack_timestamp").asLong());
+                       assertEquals(task.getStateSize(), 
taskNode.get("state_size").asLong());
+                       
assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), 
taskNode.get("end_to_end_duration").asLong());
+                       assertEquals(task.getAlignmentBuffered(), 
taskNode.get("alignment_buffered").asLong());
+                       assertEquals(task.getNumberOfSubtasks(), 
taskNode.get("num_subtasks").asInt());
+                       assertEquals(task.getNumberOfAcknowledgedSubtasks(), 
taskNode.get("num_acknowledged_subtasks").asInt());
+               }
+       }
+
+       private static TaskStateStats createTaskStateStats() {
+               ThreadLocalRandom rand = ThreadLocalRandom.current();
+
+               TaskStateStats task = mock(TaskStateStats.class);
+               when(task.getJobVertexId()).thenReturn(new JobVertexID());
+               
when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
+               when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
+               
when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
+               
when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
+               when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) 
+ 1);
+               
when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1);
+               return task;
+       }
+}

Reply via email to