http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
deleted file mode 100644
index 31c2212..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-
-import static org.mockito.Mockito.mock;
-
-/**
- * Tests for the SubtasksTimesHandler.
- */
-public class SubtasksTimesHandlerTest {
-
-       @Test
-       public void testArchiver() throws Exception {
-               JsonArchivist archivist = new 
SubtasksTimesHandler.SubtasksTimesJsonArchivist();
-               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
-               Assert.assertEquals(1, archives.size());
-
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", 
archive.getPath());
-               compareSubtaskTimes(originalTask, originalAttempt, 
archive.getJson());
-       }
-
-       @Test
-       public void testGetPaths() {
-               SubtasksTimesHandler handler = new 
SubtasksTimesHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
-       }
-
-       @Test
-       public void testJsonGeneration() throws Exception {
-               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
-               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
-               String json = 
SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
-
-               compareSubtaskTimes(originalTask, originalAttempt, json);
-       }
-
-       private static void compareSubtaskTimes(AccessExecutionJobVertex 
originalTask, AccessExecution originalAttempt, String json) throws IOException {
-               JsonNode result = 
ArchivedJobGenerationUtils.MAPPER.readTree(json);
-
-               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
-               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
-               Assert.assertTrue(result.get("now").asLong() > 0L);
-
-               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
-
-               JsonNode subtask = subtasks.get(0);
-               Assert.assertEquals(0, subtask.get("subtask").asInt());
-               
Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(),
 subtask.get("host").asText());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState())
 - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), 
subtask.get("duration").asLong());
-
-               JsonNode timestamps = subtask.get("timestamps");
-
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), 
timestamps.get(ExecutionState.CREATED.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED),
 timestamps.get(ExecutionState.SCHEDULED.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING),
 timestamps.get(ExecutionState.DEPLOYING.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), 
timestamps.get(ExecutionState.RUNNING.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), 
timestamps.get(ExecutionState.FINISHED.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING),
 timestamps.get(ExecutionState.CANCELING.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), 
timestamps.get(ExecutionState.CANCELED.name()).asLong());
-               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), 
timestamps.get(ExecutionState.FAILED.name()).asLong());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
deleted file mode 100644
index faeff13..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagerLogHandlerTest.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.blob.BlobKey;
-import org.apache.flink.runtime.blob.VoidBlobStore;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.instance.Instance;
-import org.apache.flink.runtime.instance.InstanceID;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.jobmaster.JobManagerGateway;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
-import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.atomic.AtomicReference;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.isA;
-import static org.powermock.api.mockito.PowerMockito.mock;
-import static org.powermock.api.mockito.PowerMockito.when;
-
-/**
- * Tests for the TaskManagersLogHandler.
- */
-public class TaskManagerLogHandlerTest {
-       @Test
-       public void testGetPaths() {
-               TaskManagerLogHandler handlerLog = new TaskManagerLogHandler(
-                       mock(GatewayRetriever.class),
-                       Executors.directExecutor(),
-                       CompletableFuture.completedFuture("/jm/address"),
-                       TestingUtils.TIMEOUT(),
-                       TaskManagerLogHandler.FileMode.LOG,
-                       new Configuration(),
-                       new VoidBlobStore());
-               String[] pathsLog = handlerLog.getPaths();
-               Assert.assertEquals(1, pathsLog.length);
-               Assert.assertEquals("/taskmanagers/:taskmanagerid/log", 
pathsLog[0]);
-
-               TaskManagerLogHandler handlerOut = new TaskManagerLogHandler(
-                       mock(GatewayRetriever.class),
-                       Executors.directExecutor(),
-                       CompletableFuture.completedFuture("/jm/address"),
-                       TestingUtils.TIMEOUT(),
-                       TaskManagerLogHandler.FileMode.STDOUT,
-                       new Configuration(),
-                       new VoidBlobStore());
-               String[] pathsOut = handlerOut.getPaths();
-               Assert.assertEquals(1, pathsOut.length);
-               Assert.assertEquals("/taskmanagers/:taskmanagerid/stdout", 
pathsOut[0]);
-       }
-
-       @Test
-       public void testLogFetchingFailure() throws Exception {
-               // ========= setup TaskManager 
=================================================================================
-               InstanceID tmID = new InstanceID();
-               ResourceID tmRID = new ResourceID(tmID.toString());
-               TaskManagerGateway taskManagerGateway = 
mock(TaskManagerGateway.class);
-               when(taskManagerGateway.getAddress()).thenReturn("/tm/address");
-
-               Instance taskManager = mock(Instance.class);
-               when(taskManager.getId()).thenReturn(tmID);
-               when(taskManager.getTaskManagerID()).thenReturn(tmRID);
-               
when(taskManager.getTaskManagerGateway()).thenReturn(taskManagerGateway);
-               CompletableFuture<BlobKey> future = new CompletableFuture<>();
-               future.completeExceptionally(new IOException("failure"));
-               
when(taskManagerGateway.requestTaskManagerLog(any(Time.class))).thenReturn(future);
-
-               // ========= setup JobManager 
==================================================================================
-
-               JobManagerGateway jobManagerGateway = 
mock(JobManagerGateway.class);
-               
when(jobManagerGateway.requestBlobServerPort(any(Time.class))).thenReturn(CompletableFuture.completedFuture(1337));
-               when(jobManagerGateway.getHostname()).thenReturn("localhost");
-               
when(jobManagerGateway.requestTaskManagerInstance(any(InstanceID.class), 
any(Time.class))).thenReturn(
-                       
CompletableFuture.completedFuture(Optional.of(taskManager)));
-
-               GatewayRetriever<JobManagerGateway> retriever = 
mock(GatewayRetriever.class);
-               when(retriever.getNow())
-                       .thenReturn(Optional.of(jobManagerGateway));
-
-               TaskManagerLogHandler handler = new TaskManagerLogHandler(
-                       retriever,
-                       Executors.directExecutor(),
-                       CompletableFuture.completedFuture("/jm/address"),
-                       TestingUtils.TIMEOUT(),
-                       TaskManagerLogHandler.FileMode.LOG,
-                       new Configuration(),
-                       new VoidBlobStore());
-
-               final AtomicReference<String> exception = new 
AtomicReference<>();
-
-               ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
-               when(ctx.write(isA(ByteBuf.class))).thenAnswer(new 
Answer<Object>() {
-                       @Override
-                       public Object answer(InvocationOnMock invocationOnMock) 
throws Throwable {
-                               ByteBuf data = 
invocationOnMock.getArgumentAt(0, ByteBuf.class);
-                               exception.set(new String(data.array(), 
ConfigConstants.DEFAULT_CHARSET));
-                               return null;
-                       }
-               });
-
-               Map<String, String> pathParams = new HashMap<>();
-               pathParams.put(TaskManagersHandler.TASK_MANAGER_ID_KEY, 
tmID.toString());
-               Routed routed = mock(Routed.class);
-               when(routed.pathParams()).thenReturn(pathParams);
-               when(routed.request()).thenReturn(new 
DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/taskmanagers/" + 
tmID + "/log"));
-
-               handler.respondAsLeader(ctx, routed, jobManagerGateway);
-
-               Assert.assertEquals("Fetching TaskManager log failed.", 
exception.get());
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
deleted file mode 100644
index e3a71a1..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/TaskManagersHandlerTest.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers;
-
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.Executors;
-
-import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-
-/**
- * Tests for the TaskManagersHandler.
- */
-public class TaskManagersHandlerTest {
-       @Test
-       public void testGetPaths() {
-               TaskManagersHandler handler = new 
TaskManagersHandler(Executors.directExecutor(), Time.seconds(0L), null);
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(2, paths.length);
-               List<String> pathsList = Lists.newArrayList(paths);
-               Assert.assertTrue(pathsList.contains("/taskmanagers"));
-               
Assert.assertTrue(pathsList.contains("/taskmanagers/:taskmanagerid"));
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
deleted file mode 100644
index 47298be..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
-import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpointConfigHandler.
- */
-public class CheckpointConfigHandlerTest {
-
-       @Test
-       public void testArchiver() throws IOException {
-               JsonArchivist archivist = new 
CheckpointConfigHandler.CheckpointConfigJsonArchivist();
-               GraphAndSettings graphAndSettings = 
createGraphAndSettings(true, true);
-
-               AccessExecutionGraph graph = graphAndSettings.graph;
-               when(graph.getJobID()).thenReturn(new JobID());
-               JobCheckpointingSettings settings = 
graphAndSettings.snapshottingSettings;
-               ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
-               Assert.assertEquals(1, archives.size());
-               ArchivedJson archive = archives.iterator().next();
-               Assert.assertEquals("/jobs/" + graph.getJobID() + 
"/checkpoints/config", archive.getPath());
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(archive.getJson());
-
-               Assert.assertEquals("exactly_once", 
rootNode.get("mode").asText());
-               Assert.assertEquals(settings.getCheckpointInterval(), 
rootNode.get("interval").asLong());
-               Assert.assertEquals(settings.getCheckpointTimeout(), 
rootNode.get("timeout").asLong());
-               Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), 
rootNode.get("min_pause").asLong());
-               Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), 
rootNode.get("max_concurrent").asInt());
-
-               JsonNode externalizedNode = rootNode.get("externalization");
-               Assert.assertNotNull(externalizedNode);
-               
Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
-               
Assert.assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
-
-       }
-
-       @Test
-       public void testGetPaths() {
-               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/checkpoints/config", 
paths[0]);
-       }
-
-       /**
-        * Tests a simple config.
-        */
-       @Test
-       public void testSimpleConfig() throws Exception {
-               GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, true);
-
-               AccessExecutionGraph graph = graphAndSettings.graph;
-               JobCheckpointingSettings settings = 
graphAndSettings.snapshottingSettings;
-
-               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(json);
-
-               assertEquals("exactly_once", rootNode.get("mode").asText());
-               assertEquals(settings.getCheckpointInterval(), 
rootNode.get("interval").asLong());
-               assertEquals(settings.getCheckpointTimeout(), 
rootNode.get("timeout").asLong());
-               assertEquals(settings.getMinPauseBetweenCheckpoints(), 
rootNode.get("min_pause").asLong());
-               assertEquals(settings.getMaxConcurrentCheckpoints(), 
rootNode.get("max_concurrent").asInt());
-
-               JsonNode externalizedNode = rootNode.get("externalization");
-               assertNotNull(externalizedNode);
-               assertEquals(false, 
externalizedNode.get("enabled").asBoolean());
-       }
-
-       /**
-        * Tests the that the isExactlyOnce flag is respected.
-        */
-       @Test
-       public void testAtLeastOnce() throws Exception {
-               GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, false);
-
-               AccessExecutionGraph graph = graphAndSettings.graph;
-
-               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(json);
-
-               assertEquals("at_least_once", rootNode.get("mode").asText());
-       }
-
-       /**
-        * Tests that the externalized checkpoint settings are forwarded.
-        */
-       @Test
-       public void testEnabledExternalizedCheckpointSettings() throws 
Exception {
-               GraphAndSettings graphAndSettings = 
createGraphAndSettings(true, false);
-
-               AccessExecutionGraph graph = graphAndSettings.graph;
-               ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
-
-               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode externalizedNode = 
mapper.readTree(json).get("externalization");
-               assertNotNull(externalizedNode);
-               assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
-               assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
-       }
-
-       private static GraphAndSettings createGraphAndSettings(boolean 
externalized, boolean exactlyOnce) {
-               long interval = 18231823L;
-               long timeout = 996979L;
-               long minPause = 119191919L;
-               int maxConcurrent = 12929329;
-               ExternalizedCheckpointSettings externalizedSetting = 
externalized
-                       ? 
ExternalizedCheckpointSettings.externalizeCheckpoints(true)
-                       : ExternalizedCheckpointSettings.none();
-
-               JobCheckpointingSettings settings = new 
JobCheckpointingSettings(
-                       Collections.<JobVertexID>emptyList(),
-                       Collections.<JobVertexID>emptyList(),
-                       Collections.<JobVertexID>emptyList(),
-                       interval,
-                       timeout,
-                       minPause,
-                       maxConcurrent,
-                       externalizedSetting,
-                       null,
-                       exactlyOnce);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getJobCheckpointingSettings()).thenReturn(settings);
-
-               return new GraphAndSettings(graph, settings, 
externalizedSetting);
-       }
-
-       private static class GraphAndSettings {
-               public final AccessExecutionGraph graph;
-               public final JobCheckpointingSettings snapshottingSettings;
-               public final ExternalizedCheckpointSettings 
externalizedSettings;
-
-               public GraphAndSettings(
-                               AccessExecutionGraph graph,
-                               JobCheckpointingSettings snapshottingSettings,
-                               ExternalizedCheckpointSettings 
externalizedSettings) {
-                       this.graph = graph;
-                       this.snapshottingSettings = snapshottingSettings;
-                       this.externalizedSettings = externalizedSettings;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
deleted file mode 100644
index bdb3faf..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsCacheTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpoitnStatsCache.
- */
-public class CheckpointStatsCacheTest {
-
-       @Test
-       public void testZeroSizeCache() throws Exception {
-               AbstractCheckpointStats checkpoint = createCheckpoint(0, 
CheckpointStatsStatus.COMPLETED);
-
-               CheckpointStatsCache cache = new CheckpointStatsCache(0);
-               cache.tryAdd(checkpoint);
-               assertNull(cache.tryGet(0L));
-       }
-
-       @Test
-       public void testCacheAddAndGet() throws Exception {
-               AbstractCheckpointStats chk0 = createCheckpoint(0, 
CheckpointStatsStatus.COMPLETED);
-               AbstractCheckpointStats chk1 = createCheckpoint(1, 
CheckpointStatsStatus.COMPLETED);
-               AbstractCheckpointStats chk2 = createCheckpoint(2, 
CheckpointStatsStatus.IN_PROGRESS);
-
-               CheckpointStatsCache cache = new CheckpointStatsCache(1);
-               cache.tryAdd(chk0);
-               assertEquals(chk0, cache.tryGet(0));
-
-               cache.tryAdd(chk1);
-               assertNull(cache.tryGet(0));
-               assertEquals(chk1, cache.tryGet(1));
-
-               cache.tryAdd(chk2);
-               assertNull(cache.tryGet(2));
-               assertNull(cache.tryGet(0));
-               assertEquals(chk1, cache.tryGet(1));
-       }
-
-       private AbstractCheckpointStats createCheckpoint(long id, 
CheckpointStatsStatus status) {
-               AbstractCheckpointStats checkpoint = 
mock(AbstractCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(id);
-               when(checkpoint.getStatus()).thenReturn(status);
-               return checkpoint;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
deleted file mode 100644
index f16d623..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ /dev/null
@@ -1,358 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
-import org.apache.flink.runtime.checkpoint.TaskStateStats;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ThreadLocalRandom;
-
-import static org.junit.Assert.assertEquals;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpointStatsDetailsHandler.
- */
-public class CheckpointStatsDetailsHandlerTest {
-
-       @Test
-       public void testArchiver() throws IOException {
-               JsonArchivist archivist = new 
CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
-
-               CompletedCheckpointStats completedCheckpoint = 
createCompletedCheckpoint();
-               FailedCheckpointStats failedCheckpoint = 
createFailedCheckpoint();
-               List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
-               checkpoints.add(failedCheckpoint);
-               checkpoints.add(completedCheckpoint);
-
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               when(history.getCheckpoints()).thenReturn(checkpoints);
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-               when(graph.getJobID()).thenReturn(new JobID());
-
-               ObjectMapper mapper = new ObjectMapper();
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
-               Assert.assertEquals(2, archives.size());
-
-               Iterator<ArchivedJson> iterator = archives.iterator();
-               ArchivedJson archive1 = iterator.next();
-               Assert.assertEquals(
-                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
failedCheckpoint.getCheckpointId(),
-                       archive1.getPath());
-               compareFailedCheckpoint(failedCheckpoint, 
mapper.readTree(archive1.getJson()));
-
-               ArchivedJson archive2 = iterator.next();
-               Assert.assertEquals(
-                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
completedCheckpoint.getCheckpointId(),
-                       archive2.getPath());
-               compareCompletedCheckpoint(completedCheckpoint, 
mapper.readTree(archive2.getJson()));
-       }
-
-       @Test
-       public void testGetPaths() {
-               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               
Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]);
-       }
-
-       /**
-        * Tests request with illegal checkpoint ID param.
-        */
-       @Test
-       public void testIllegalCheckpointId() throws Exception {
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "illegal checkpoint");
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-       }
-
-       /**
-        * Tests request with missing checkpoint ID param.
-        */
-       @Test
-       public void testNoCheckpointIdParam() throws Exception {
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap()).get();
-
-               assertEquals("{}", json);
-       }
-
-       /**
-        * Test lookup of not existing checkpoint in history.
-        */
-       @Test
-       public void testCheckpointNotFound() throws Exception {
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               when(history.getCheckpointById(anyLong())).thenReturn(null); // 
not found
-
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "123");
-               String json = handler.handleRequest(graph, params).get();
-
-               assertEquals("{}", json);
-               verify(history, times(1)).getCheckpointById(anyLong());
-       }
-
-       /**
-        * Tests a checkpoint details request for an in progress checkpoint.
-        */
-       @Test
-       public void testCheckpointDetailsRequestInProgressCheckpoint() throws 
Exception {
-               PendingCheckpointStats checkpoint = 
mock(PendingCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1992139L);
-               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-               
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
-               when(checkpoint.getTriggerTimestamp()).thenReturn(1919191900L);
-               
when(checkpoint.getLatestAckTimestamp()).thenReturn(1977791901L);
-               when(checkpoint.getStateSize()).thenReturn(111939272822L);
-               when(checkpoint.getEndToEndDuration()).thenReturn(121191L);
-               when(checkpoint.getAlignmentBuffered()).thenReturn(1L);
-               when(checkpoint.getNumberOfSubtasks()).thenReturn(501);
-               
when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
-
-               List<TaskStateStats> taskStats = new ArrayList<>();
-               TaskStateStats task1 = createTaskStateStats();
-               TaskStateStats task2 = createTaskStateStats();
-               taskStats.add(task1);
-               taskStats.add(task2);
-
-               when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
-
-               JsonNode rootNode = triggerRequest(checkpoint);
-
-               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
-               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
-               assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
-               assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
-               assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
-               assertEquals(checkpoint.getEndToEndDuration(), 
rootNode.get("end_to_end_duration").asLong());
-               assertEquals(checkpoint.getAlignmentBuffered(), 
rootNode.get("alignment_buffered").asLong());
-               assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
-               assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
-
-               verifyTaskNodes(taskStats, rootNode);
-       }
-
-       /**
-        * Tests a checkpoint details request for a completed checkpoint.
-        */
-       @Test
-       public void testCheckpointDetailsRequestCompletedCheckpoint() throws 
Exception {
-               CompletedCheckpointStats checkpoint = 
createCompletedCheckpoint();
-
-               JsonNode rootNode = triggerRequest(checkpoint);
-
-               compareCompletedCheckpoint(checkpoint, rootNode);
-
-               verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
-       }
-
-       /**
-        * Tests a checkpoint details request for a failed checkpoint.
-        */
-       @Test
-       public void testCheckpointDetailsRequestFailedCheckpoint() throws 
Exception {
-               FailedCheckpointStats checkpoint = createFailedCheckpoint();
-
-               JsonNode rootNode = triggerRequest(checkpoint);
-
-               compareFailedCheckpoint(checkpoint, rootNode);
-
-               verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
-       }
-
-       // 
------------------------------------------------------------------------
-
-       private static CompletedCheckpointStats createCompletedCheckpoint() {
-               CompletedCheckpointStats checkpoint = 
mock(CompletedCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1818213L);
-               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
-               
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
-               when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
-               when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
-               when(checkpoint.getStateSize()).thenReturn(925281L);
-               when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
-               when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
-               when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
-               
when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
-               when(checkpoint.isDiscarded()).thenReturn(true);
-               
when(checkpoint.getExternalPath()).thenReturn("checkpoint-external-path");
-
-               List<TaskStateStats> taskStats = new ArrayList<>();
-               TaskStateStats task1 = createTaskStateStats();
-               TaskStateStats task2 = createTaskStateStats();
-               taskStats.add(task1);
-               taskStats.add(task2);
-
-               when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
-
-               return checkpoint;
-       }
-
-       private static void compareCompletedCheckpoint(CompletedCheckpointStats 
checkpoint, JsonNode rootNode) {
-               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
-               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
-               assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
-               assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
-               assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
-               assertEquals(checkpoint.getEndToEndDuration(), 
rootNode.get("end_to_end_duration").asLong());
-               assertEquals(checkpoint.getAlignmentBuffered(), 
rootNode.get("alignment_buffered").asLong());
-               assertEquals(checkpoint.isDiscarded(), 
rootNode.get("discarded").asBoolean());
-               assertEquals(checkpoint.getExternalPath(), 
rootNode.get("external_path").asText());
-               assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
-               assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
-       }
-
-       private static FailedCheckpointStats createFailedCheckpoint() {
-               FailedCheckpointStats checkpoint = 
mock(FailedCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1818214L);
-               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
-               
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
-               when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
-               when(checkpoint.getLatestAckTimestamp()).thenReturn(11029222L);
-               when(checkpoint.getStateSize()).thenReturn(925281L);
-               when(checkpoint.getEndToEndDuration()).thenReturn(181819L);
-               when(checkpoint.getAlignmentBuffered()).thenReturn(1010198L);
-               when(checkpoint.getNumberOfSubtasks()).thenReturn(181271);
-               
when(checkpoint.getNumberOfAcknowledgedSubtasks()).thenReturn(29821);
-               
when(checkpoint.getFailureTimestamp()).thenReturn(123012890312093L);
-               
when(checkpoint.getFailureMessage()).thenReturn("failure-message");
-
-               List<TaskStateStats> taskStats = new ArrayList<>();
-               TaskStateStats task1 = createTaskStateStats();
-               TaskStateStats task2 = createTaskStateStats();
-               taskStats.add(task1);
-               taskStats.add(task2);
-
-               when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
-
-               return checkpoint;
-       }
-
-       private static void compareFailedCheckpoint(FailedCheckpointStats 
checkpoint, JsonNode rootNode) {
-               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
-               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
-               assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
-               assertEquals(checkpoint.getTriggerTimestamp(), 
rootNode.get("trigger_timestamp").asLong());
-               assertEquals(checkpoint.getLatestAckTimestamp(), 
rootNode.get("latest_ack_timestamp").asLong());
-               assertEquals(checkpoint.getStateSize(), 
rootNode.get("state_size").asLong());
-               assertEquals(checkpoint.getEndToEndDuration(), 
rootNode.get("end_to_end_duration").asLong());
-               assertEquals(checkpoint.getAlignmentBuffered(), 
rootNode.get("alignment_buffered").asLong());
-               assertEquals(checkpoint.getFailureTimestamp(), 
rootNode.get("failure_timestamp").asLong());
-               assertEquals(checkpoint.getFailureMessage(), 
rootNode.get("failure_message").asText());
-               assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
-               assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
-       }
-
-       private static JsonNode triggerRequest(AbstractCheckpointStats 
checkpoint) throws Exception {
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getHistory()).thenReturn(history);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-               CheckpointStatsDetailsHandler handler = new 
CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor(), new CheckpointStatsCache(0));
-               Map<String, String> params = new HashMap<>();
-               params.put("checkpointid", "123");
-               String json = handler.handleRequest(graph, params).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               return mapper.readTree(json);
-       }
-
-       private static void verifyTaskNodes(Collection<TaskStateStats> tasks, 
JsonNode parentNode) {
-               for (TaskStateStats task : tasks) {
-                       long duration = 
ThreadLocalRandom.current().nextInt(128);
-
-                       JsonNode taskNode = 
parentNode.get("tasks").get(task.getJobVertexId().toString());
-                       assertEquals(task.getLatestAckTimestamp(), 
taskNode.get("latest_ack_timestamp").asLong());
-                       assertEquals(task.getStateSize(), 
taskNode.get("state_size").asLong());
-                       
assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), 
taskNode.get("end_to_end_duration").asLong());
-                       assertEquals(task.getAlignmentBuffered(), 
taskNode.get("alignment_buffered").asLong());
-                       assertEquals(task.getNumberOfSubtasks(), 
taskNode.get("num_subtasks").asInt());
-                       assertEquals(task.getNumberOfAcknowledgedSubtasks(), 
taskNode.get("num_acknowledged_subtasks").asInt());
-               }
-       }
-
-       private static TaskStateStats createTaskStateStats() {
-               ThreadLocalRandom rand = ThreadLocalRandom.current();
-
-               TaskStateStats task = mock(TaskStateStats.class);
-               when(task.getJobVertexId()).thenReturn(new JobVertexID());
-               
when(task.getLatestAckTimestamp()).thenReturn(rand.nextLong(1024) + 1);
-               when(task.getStateSize()).thenReturn(rand.nextLong(1024) + 1);
-               
when(task.getEndToEndDuration(anyLong())).thenReturn(rand.nextLong(1024) + 1);
-               
when(task.getAlignmentBuffered()).thenReturn(rand.nextLong(1024) + 1);
-               when(task.getNumberOfSubtasks()).thenReturn(rand.nextInt(1024) 
+ 1);
-               
when(task.getNumberOfAcknowledgedSubtasks()).thenReturn(rand.nextInt(1024) + 1);
-               return task;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
deleted file mode 100644
index ed73a62..0000000
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ /dev/null
@@ -1,432 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.webmonitor.handlers.checkpoints;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CheckpointProperties;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
-import org.apache.flink.runtime.checkpoint.CheckpointStatsStatus;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.CompletedCheckpointStatsSummary;
-import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
-import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
-import org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
-import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
-import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
-import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
-import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
-
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Tests for the CheckpointStatsHandler.
- */
-public class CheckpointStatsHandlerTest {
-
-       @Test
-       public void testArchiver() throws IOException {
-               JsonArchivist archivist = new 
CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
-               TestCheckpointStats testCheckpointStats = 
createTestCheckpointStats();
-               when(testCheckpointStats.graph.getJobID()).thenReturn(new 
JobID());
-
-               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(testCheckpointStats.graph);
-               Assert.assertEquals(3, archives.size());
-
-               ObjectMapper mapper = new ObjectMapper();
-
-               Iterator<ArchivedJson> iterator = archives.iterator();
-               ArchivedJson archive1 = iterator.next();
-               Assert.assertEquals("/jobs/" + 
testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + 
testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath());
-               compareInProgressCheckpoint(testCheckpointStats.inProgress, 
mapper.readTree(archive1.getJson()));
-
-               ArchivedJson archive2 = iterator.next();
-               Assert.assertEquals("/jobs/" + 
testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + 
testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath());
-               
compareCompletedSavepoint(testCheckpointStats.completedSavepoint, 
mapper.readTree(archive2.getJson()));
-
-               ArchivedJson archive3 = iterator.next();
-               Assert.assertEquals("/jobs/" + 
testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + 
testCheckpointStats.failed.getCheckpointId(), archive3.getPath());
-               compareFailedCheckpoint(testCheckpointStats.failed, 
mapper.readTree(archive3.getJson()));
-       }
-
-       @Test
-       public void testGetPaths() {
-               CheckpointStatsHandler handler = new 
CheckpointStatsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String[] paths = handler.getPaths();
-               Assert.assertEquals(1, paths.length);
-               Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]);
-       }
-
-       /**
-        * Tests a complete checkpoint stats snapshot.
-        */
-       @Test
-       public void testCheckpointStatsRequest() throws Exception {
-               TestCheckpointStats testCheckpointStats = 
createTestCheckpointStats();
-
-               CheckpointStatsHandler handler = new 
CheckpointStatsHandler(mock(ExecutionGraphHolder.class), 
Executors.directExecutor());
-               String json = handler.handleRequest(testCheckpointStats.graph, 
Collections.<String, String>emptyMap()).get();
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(json);
-
-               compareCheckpointStats(testCheckpointStats, rootNode);
-       }
-
-       private static TestCheckpointStats createTestCheckpointStats() {
-               // Counts
-               CheckpointStatsCounts counts = 
mock(CheckpointStatsCounts.class);
-               
when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
-               
when(counts.getTotalNumberOfCheckpoints()).thenReturn(12981231203L);
-               
when(counts.getNumberOfInProgressCheckpoints()).thenReturn(191919);
-               
when(counts.getNumberOfCompletedCheckpoints()).thenReturn(882828200L);
-               
when(counts.getNumberOfFailedCheckpoints()).thenReturn(99171510L);
-
-               // Summary
-               CompletedCheckpointStatsSummary summary = 
mock(CompletedCheckpointStatsSummary.class);
-
-               MinMaxAvgStats stateSizeSummary = mock(MinMaxAvgStats.class);
-               when(stateSizeSummary.getMinimum()).thenReturn(81238123L);
-               when(stateSizeSummary.getMaximum()).thenReturn(19919191999L);
-               when(stateSizeSummary.getAverage()).thenReturn(1133L);
-
-               MinMaxAvgStats durationSummary = mock(MinMaxAvgStats.class);
-               when(durationSummary.getMinimum()).thenReturn(1182L);
-               when(durationSummary.getMaximum()).thenReturn(88654L);
-               when(durationSummary.getAverage()).thenReturn(171L);
-
-               MinMaxAvgStats alignmentBufferedSummary = 
mock(MinMaxAvgStats.class);
-               
when(alignmentBufferedSummary.getMinimum()).thenReturn(81818181899L);
-               
when(alignmentBufferedSummary.getMaximum()).thenReturn(89999911118654L);
-               
when(alignmentBufferedSummary.getAverage()).thenReturn(11203131L);
-
-               when(summary.getStateSizeStats()).thenReturn(stateSizeSummary);
-               
when(summary.getEndToEndDurationStats()).thenReturn(durationSummary);
-               
when(summary.getAlignmentBufferedStats()).thenReturn(alignmentBufferedSummary);
-
-               // Latest
-               CompletedCheckpointStats latestCompleted = 
mock(CompletedCheckpointStats.class);
-               when(latestCompleted.getCheckpointId()).thenReturn(1992139L);
-               
when(latestCompleted.getTriggerTimestamp()).thenReturn(1919191900L);
-               
when(latestCompleted.getLatestAckTimestamp()).thenReturn(1977791901L);
-               when(latestCompleted.getStateSize()).thenReturn(111939272822L);
-               when(latestCompleted.getEndToEndDuration()).thenReturn(121191L);
-               when(latestCompleted.getAlignmentBuffered()).thenReturn(1L);
-               
when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
-
-               CompletedCheckpointStats latestSavepoint = 
mock(CompletedCheckpointStats.class);
-               when(latestSavepoint.getCheckpointId()).thenReturn(1992140L);
-               
when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
-               
when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
-               when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
-               when(latestSavepoint.getEndToEndDuration()).thenReturn(121191L);
-               
when(latestCompleted.getAlignmentBuffered()).thenReturn(182813L);
-               
when(latestSavepoint.getExternalPath()).thenReturn("savepoint-external-path");
-
-               FailedCheckpointStats latestFailed = 
mock(FailedCheckpointStats.class);
-               when(latestFailed.getCheckpointId()).thenReturn(1112L);
-               when(latestFailed.getTriggerTimestamp()).thenReturn(12828L);
-               when(latestFailed.getLatestAckTimestamp()).thenReturn(1901L);
-               when(latestFailed.getFailureTimestamp()).thenReturn(11999976L);
-               when(latestFailed.getStateSize()).thenReturn(111L);
-               when(latestFailed.getEndToEndDuration()).thenReturn(12L);
-               when(latestFailed.getAlignmentBuffered()).thenReturn(2L);
-               when(latestFailed.getFailureMessage()).thenReturn("expected 
cause");
-
-               RestoredCheckpointStats latestRestored = 
mock(RestoredCheckpointStats.class);
-               when(latestRestored.getCheckpointId()).thenReturn(1199L);
-               when(latestRestored.getRestoreTimestamp()).thenReturn(434242L);
-               
when(latestRestored.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
-               when(latestRestored.getExternalPath()).thenReturn("restored 
savepoint path");
-
-               // History
-               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
-               List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
-
-               PendingCheckpointStats inProgress = 
mock(PendingCheckpointStats.class);
-               when(inProgress.getCheckpointId()).thenReturn(1992141L);
-               
when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
-               
when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
-               when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
-               
when(inProgress.getLatestAckTimestamp()).thenReturn(1977791901L);
-               when(inProgress.getStateSize()).thenReturn(111939272822L);
-               when(inProgress.getEndToEndDuration()).thenReturn(121191L);
-               when(inProgress.getAlignmentBuffered()).thenReturn(1L);
-               when(inProgress.getNumberOfSubtasks()).thenReturn(501);
-               
when(inProgress.getNumberOfAcknowledgedSubtasks()).thenReturn(101);
-
-               CompletedCheckpointStats completedSavepoint = 
mock(CompletedCheckpointStats.class);
-               when(completedSavepoint.getCheckpointId()).thenReturn(1322139L);
-               
when(completedSavepoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
-               
when(completedSavepoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
-               
when(completedSavepoint.getTriggerTimestamp()).thenReturn(191900L);
-               
when(completedSavepoint.getLatestAckTimestamp()).thenReturn(197791901L);
-               when(completedSavepoint.getStateSize()).thenReturn(1119822L);
-               
when(completedSavepoint.getEndToEndDuration()).thenReturn(12191L);
-               
when(completedSavepoint.getAlignmentBuffered()).thenReturn(111L);
-               
when(completedSavepoint.getNumberOfSubtasks()).thenReturn(33501);
-               
when(completedSavepoint.getNumberOfAcknowledgedSubtasks()).thenReturn(211);
-               when(completedSavepoint.isDiscarded()).thenReturn(true);
-               
when(completedSavepoint.getExternalPath()).thenReturn("completed-external-path");
-
-               FailedCheckpointStats failed = 
mock(FailedCheckpointStats.class);
-               when(failed.getCheckpointId()).thenReturn(110719L);
-               
when(failed.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
-               
when(failed.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
-               when(failed.getTriggerTimestamp()).thenReturn(191900L);
-               when(failed.getLatestAckTimestamp()).thenReturn(197791901L);
-               when(failed.getStateSize()).thenReturn(1119822L);
-               when(failed.getEndToEndDuration()).thenReturn(12191L);
-               when(failed.getAlignmentBuffered()).thenReturn(111L);
-               when(failed.getNumberOfSubtasks()).thenReturn(33501);
-               when(failed.getNumberOfAcknowledgedSubtasks()).thenReturn(1);
-               when(failed.getFailureTimestamp()).thenReturn(119230L);
-               when(failed.getFailureMessage()).thenReturn("failure message");
-
-               checkpoints.add(inProgress);
-               checkpoints.add(completedSavepoint);
-               checkpoints.add(failed);
-               when(history.getCheckpoints()).thenReturn(checkpoints);
-               
when(history.getLatestCompletedCheckpoint()).thenReturn(latestCompleted);
-               when(history.getLatestSavepoint()).thenReturn(latestSavepoint);
-               
when(history.getLatestFailedCheckpoint()).thenReturn(latestFailed);
-
-               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
-               when(snapshot.getCounts()).thenReturn(counts);
-               when(snapshot.getSummaryStats()).thenReturn(summary);
-               when(snapshot.getHistory()).thenReturn(history);
-               
when(snapshot.getLatestRestoredCheckpoint()).thenReturn(latestRestored);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
-
-               return new TestCheckpointStats(
-                       graph, counts, stateSizeSummary, durationSummary, 
alignmentBufferedSummary, summary,
-                       latestCompleted, latestSavepoint, latestFailed, 
latestRestored, inProgress,
-                       completedSavepoint, failed, history, snapshot
-               );
-       }
-
-       private static void compareCheckpointStats(TestCheckpointStats 
checkpointStats, JsonNode rootNode) {
-               CheckpointStatsCounts counts = checkpointStats.counts;
-               JsonNode countNode = rootNode.get("counts");
-               assertEquals(counts.getNumberOfRestoredCheckpoints(), 
countNode.get("restored").asLong());
-               assertEquals(counts.getTotalNumberOfCheckpoints(), 
countNode.get("total").asLong());
-               assertEquals(counts.getNumberOfInProgressCheckpoints(), 
countNode.get("in_progress").asLong());
-               assertEquals(counts.getNumberOfCompletedCheckpoints(), 
countNode.get("completed").asLong());
-               assertEquals(counts.getNumberOfFailedCheckpoints(), 
countNode.get("failed").asLong());
-
-               MinMaxAvgStats stateSizeSummary = 
checkpointStats.stateSizeSummary;
-               JsonNode summaryNode = rootNode.get("summary");
-               JsonNode sizeSummaryNode = summaryNode.get("state_size");
-               assertEquals(stateSizeSummary.getMinimum(), 
sizeSummaryNode.get("min").asLong());
-               assertEquals(stateSizeSummary.getMaximum(), 
sizeSummaryNode.get("max").asLong());
-               assertEquals(stateSizeSummary.getAverage(), 
sizeSummaryNode.get("avg").asLong());
-
-               MinMaxAvgStats durationSummary = 
checkpointStats.durationSummary;
-               JsonNode durationSummaryNode = 
summaryNode.get("end_to_end_duration");
-               assertEquals(durationSummary.getMinimum(), 
durationSummaryNode.get("min").asLong());
-               assertEquals(durationSummary.getMaximum(), 
durationSummaryNode.get("max").asLong());
-               assertEquals(durationSummary.getAverage(), 
durationSummaryNode.get("avg").asLong());
-
-               MinMaxAvgStats alignmentBufferedSummary = 
checkpointStats.alignmentBufferedSummary;
-               JsonNode alignmentBufferedNode = 
summaryNode.get("alignment_buffered");
-               assertEquals(alignmentBufferedSummary.getMinimum(), 
alignmentBufferedNode.get("min").asLong());
-               assertEquals(alignmentBufferedSummary.getMaximum(), 
alignmentBufferedNode.get("max").asLong());
-               assertEquals(alignmentBufferedSummary.getAverage(), 
alignmentBufferedNode.get("avg").asLong());
-
-               CompletedCheckpointStats latestCompleted = 
checkpointStats.latestCompleted;
-               JsonNode latestNode = rootNode.get("latest");
-               JsonNode latestCheckpointNode = latestNode.get("completed");
-               assertEquals(latestCompleted.getCheckpointId(), 
latestCheckpointNode.get("id").asLong());
-               assertEquals(latestCompleted.getTriggerTimestamp(), 
latestCheckpointNode.get("trigger_timestamp").asLong());
-               assertEquals(latestCompleted.getLatestAckTimestamp(), 
latestCheckpointNode.get("latest_ack_timestamp").asLong());
-               assertEquals(latestCompleted.getStateSize(), 
latestCheckpointNode.get("state_size").asLong());
-               assertEquals(latestCompleted.getEndToEndDuration(), 
latestCheckpointNode.get("end_to_end_duration").asLong());
-               assertEquals(latestCompleted.getAlignmentBuffered(), 
latestCheckpointNode.get("alignment_buffered").asLong());
-               assertEquals(latestCompleted.getExternalPath(), 
latestCheckpointNode.get("external_path").asText());
-
-               CompletedCheckpointStats latestSavepoint = 
checkpointStats.latestSavepoint;
-               JsonNode latestSavepointNode = latestNode.get("savepoint");
-               assertEquals(latestSavepoint.getCheckpointId(), 
latestSavepointNode.get("id").asLong());
-               assertEquals(latestSavepoint.getTriggerTimestamp(), 
latestSavepointNode.get("trigger_timestamp").asLong());
-               assertEquals(latestSavepoint.getLatestAckTimestamp(), 
latestSavepointNode.get("latest_ack_timestamp").asLong());
-               assertEquals(latestSavepoint.getStateSize(), 
latestSavepointNode.get("state_size").asLong());
-               assertEquals(latestSavepoint.getEndToEndDuration(), 
latestSavepointNode.get("end_to_end_duration").asLong());
-               assertEquals(latestSavepoint.getAlignmentBuffered(), 
latestSavepointNode.get("alignment_buffered").asLong());
-               assertEquals(latestSavepoint.getExternalPath(), 
latestSavepointNode.get("external_path").asText());
-
-               FailedCheckpointStats latestFailed = 
checkpointStats.latestFailed;
-               JsonNode latestFailedNode = latestNode.get("failed");
-               assertEquals(latestFailed.getCheckpointId(), 
latestFailedNode.get("id").asLong());
-               assertEquals(latestFailed.getTriggerTimestamp(), 
latestFailedNode.get("trigger_timestamp").asLong());
-               assertEquals(latestFailed.getLatestAckTimestamp(), 
latestFailedNode.get("latest_ack_timestamp").asLong());
-               assertEquals(latestFailed.getStateSize(), 
latestFailedNode.get("state_size").asLong());
-               assertEquals(latestFailed.getEndToEndDuration(), 
latestFailedNode.get("end_to_end_duration").asLong());
-               assertEquals(latestFailed.getAlignmentBuffered(), 
latestFailedNode.get("alignment_buffered").asLong());
-               assertEquals(latestFailed.getFailureTimestamp(), 
latestFailedNode.get("failure_timestamp").asLong());
-               assertEquals(latestFailed.getFailureMessage(), 
latestFailedNode.get("failure_message").asText());
-
-               RestoredCheckpointStats latestRestored = 
checkpointStats.latestRestored;
-               JsonNode latestRestoredNode = latestNode.get("restored");
-               assertEquals(latestRestored.getCheckpointId(), 
latestRestoredNode.get("id").asLong());
-               assertEquals(latestRestored.getRestoreTimestamp(), 
latestRestoredNode.get("restore_timestamp").asLong());
-               assertEquals(latestRestored.getProperties().isSavepoint(), 
latestRestoredNode.get("is_savepoint").asBoolean());
-               assertEquals(latestRestored.getExternalPath(), 
latestRestoredNode.get("external_path").asText());
-
-               JsonNode historyNode = rootNode.get("history");
-               Iterator<JsonNode> it = historyNode.iterator();
-
-               assertTrue(it.hasNext());
-               JsonNode inProgressNode = it.next();
-
-               PendingCheckpointStats inProgress = checkpointStats.inProgress;
-               compareInProgressCheckpoint(inProgress, inProgressNode);
-
-               assertTrue(it.hasNext());
-               JsonNode completedSavepointNode = it.next();
-
-               CompletedCheckpointStats completedSavepoint = 
checkpointStats.completedSavepoint;
-               compareCompletedSavepoint(completedSavepoint, 
completedSavepointNode);
-
-               assertTrue(it.hasNext());
-               JsonNode failedNode = it.next();
-
-               FailedCheckpointStats failed = checkpointStats.failed;
-               compareFailedCheckpoint(failed, failedNode);
-
-               assertFalse(it.hasNext());
-       }
-
-       private static void compareInProgressCheckpoint(PendingCheckpointStats 
inProgress, JsonNode inProgressNode) {
-               assertEquals(inProgress.getCheckpointId(), 
inProgressNode.get("id").asLong());
-               assertEquals(inProgress.getStatus().toString(), 
inProgressNode.get("status").asText());
-               assertEquals(inProgress.getProperties().isSavepoint(), 
inProgressNode.get("is_savepoint").asBoolean());
-               assertEquals(inProgress.getTriggerTimestamp(), 
inProgressNode.get("trigger_timestamp").asLong());
-               assertEquals(inProgress.getLatestAckTimestamp(), 
inProgressNode.get("latest_ack_timestamp").asLong());
-               assertEquals(inProgress.getStateSize(), 
inProgressNode.get("state_size").asLong());
-               assertEquals(inProgress.getEndToEndDuration(), 
inProgressNode.get("end_to_end_duration").asLong());
-               assertEquals(inProgress.getAlignmentBuffered(), 
inProgressNode.get("alignment_buffered").asLong());
-               assertEquals(inProgress.getNumberOfSubtasks(), 
inProgressNode.get("num_subtasks").asInt());
-               assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), 
inProgressNode.get("num_acknowledged_subtasks").asInt());
-       }
-
-       private static void compareCompletedSavepoint(CompletedCheckpointStats 
completedSavepoint, JsonNode completedSavepointNode) {
-               assertEquals(completedSavepoint.getCheckpointId(), 
completedSavepointNode.get("id").asLong());
-               assertEquals(completedSavepoint.getStatus().toString(), 
completedSavepointNode.get("status").asText());
-               assertEquals(completedSavepoint.getProperties().isSavepoint(), 
completedSavepointNode.get("is_savepoint").asBoolean());
-               assertEquals(completedSavepoint.getTriggerTimestamp(), 
completedSavepointNode.get("trigger_timestamp").asLong());
-               assertEquals(completedSavepoint.getLatestAckTimestamp(), 
completedSavepointNode.get("latest_ack_timestamp").asLong());
-               assertEquals(completedSavepoint.getStateSize(), 
completedSavepointNode.get("state_size").asLong());
-               assertEquals(completedSavepoint.getEndToEndDuration(), 
completedSavepointNode.get("end_to_end_duration").asLong());
-               assertEquals(completedSavepoint.getAlignmentBuffered(), 
completedSavepointNode.get("alignment_buffered").asLong());
-               assertEquals(completedSavepoint.getNumberOfSubtasks(), 
completedSavepointNode.get("num_subtasks").asInt());
-               
assertEquals(completedSavepoint.getNumberOfAcknowledgedSubtasks(), 
completedSavepointNode.get("num_acknowledged_subtasks").asInt());
-
-               assertEquals(completedSavepoint.getExternalPath(), 
completedSavepointNode.get("external_path").asText());
-               assertEquals(completedSavepoint.isDiscarded(), 
completedSavepointNode.get("discarded").asBoolean());
-       }
-
-       private static void compareFailedCheckpoint(FailedCheckpointStats 
failed, JsonNode failedNode) {
-               assertEquals(failed.getCheckpointId(), 
failedNode.get("id").asLong());
-               assertEquals(failed.getStatus().toString(), 
failedNode.get("status").asText());
-               assertEquals(failed.getProperties().isSavepoint(), 
failedNode.get("is_savepoint").asBoolean());
-               assertEquals(failed.getTriggerTimestamp(), 
failedNode.get("trigger_timestamp").asLong());
-               assertEquals(failed.getLatestAckTimestamp(), 
failedNode.get("latest_ack_timestamp").asLong());
-               assertEquals(failed.getStateSize(), 
failedNode.get("state_size").asLong());
-               assertEquals(failed.getEndToEndDuration(), 
failedNode.get("end_to_end_duration").asLong());
-               assertEquals(failed.getAlignmentBuffered(), 
failedNode.get("alignment_buffered").asLong());
-               assertEquals(failed.getNumberOfSubtasks(), 
failedNode.get("num_subtasks").asInt());
-               assertEquals(failed.getNumberOfAcknowledgedSubtasks(), 
failedNode.get("num_acknowledged_subtasks").asInt());
-
-               assertEquals(failed.getFailureTimestamp(), 
failedNode.get("failure_timestamp").asLong());
-               assertEquals(failed.getFailureMessage(), 
failedNode.get("failure_message").asText());
-       }
-
-       private static class TestCheckpointStats {
-               public final AccessExecutionGraph graph;
-               public final CheckpointStatsCounts counts;
-               public final MinMaxAvgStats stateSizeSummary;
-               public final MinMaxAvgStats durationSummary;
-               public final MinMaxAvgStats alignmentBufferedSummary;
-               public final CompletedCheckpointStatsSummary summary;
-               public final CompletedCheckpointStats latestCompleted;
-               public final CompletedCheckpointStats latestSavepoint;
-               public final FailedCheckpointStats latestFailed;
-               public final RestoredCheckpointStats latestRestored;
-               public final PendingCheckpointStats inProgress;
-               public final CompletedCheckpointStats completedSavepoint;
-               public final FailedCheckpointStats failed;
-               public final CheckpointStatsHistory history;
-               public final CheckpointStatsSnapshot snapshot;
-
-               public TestCheckpointStats(
-                               AccessExecutionGraph graph,
-                               CheckpointStatsCounts counts,
-                               MinMaxAvgStats stateSizeSummary,
-                               MinMaxAvgStats durationSummary,
-                               MinMaxAvgStats alignmentBufferedSummary,
-                               CompletedCheckpointStatsSummary summary,
-                               CompletedCheckpointStats latestCompleted,
-                               CompletedCheckpointStats latestSavepoint,
-                               FailedCheckpointStats latestFailed,
-                               RestoredCheckpointStats latestRestored,
-                               PendingCheckpointStats inProgress,
-                               CompletedCheckpointStats completedSavepoint,
-                               FailedCheckpointStats failed,
-                               CheckpointStatsHistory history,
-                               CheckpointStatsSnapshot snapshot) {
-                       this.graph = graph;
-                       this.counts = counts;
-                       this.stateSizeSummary = stateSizeSummary;
-                       this.durationSummary = durationSummary;
-                       this.alignmentBufferedSummary = 
alignmentBufferedSummary;
-                       this.summary = summary;
-                       this.latestCompleted = latestCompleted;
-                       this.latestSavepoint = latestSavepoint;
-                       this.latestFailed = latestFailed;
-                       this.latestRestored = latestRestored;
-                       this.inProgress = inProgress;
-                       this.completedSavepoint = completedSavepoint;
-                       this.failed = failed;
-                       this.history = history;
-                       this.snapshot = snapshot;
-               }
-       }
-}

Reply via email to