Repository: flink
Updated Branches:
  refs/heads/master 243ef69bf -> 7fe0eb477


http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 939f439..0076d42 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -21,12 +21,34 @@ import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtasksTimesHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtasksTimesHandler.SubtasksTimesJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/subtasktimes", 
archive.getPath());
+               compareSubtaskTimes(originalTask, originalAttempt, 
archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                SubtasksTimesHandler handler = new SubtasksTimesHandler(null);
@@ -41,6 +63,10 @@ public class SubtasksTimesHandlerTest {
                AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
                String json = 
SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
 
+               compareSubtaskTimes(originalTask, originalAttempt, json);
+       }
+       
+       private static void compareSubtaskTimes(AccessExecutionJobVertex 
originalTask, AccessExecution originalAttempt, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
index e570e18..9d339f5 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandlerTest.java
@@ -20,14 +20,20 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
 
 import static org.junit.Assert.assertEquals;
@@ -38,6 +44,37 @@ import static org.mockito.Mockito.when;
 public class CheckpointConfigHandlerTest {
 
        @Test
+       public void testArchiver() throws IOException {
+               JsonArchivist archivist = new 
CheckpointConfigHandler.CheckpointConfigJsonArchivist();
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(true, true);
+
+               AccessExecutionGraph graph = graphAndSettings.graph;
+               when(graph.getJobID()).thenReturn(new JobID());
+               JobSnapshottingSettings settings = 
graphAndSettings.snapshottingSettings;
+               ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
+               
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
+               Assert.assertEquals(1, archives.size());
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + graph.getJobID() + 
"/checkpoints/config", archive.getPath());
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(archive.getJson());
+
+               Assert.assertEquals("exactly_once", 
rootNode.get("mode").asText());
+               Assert.assertEquals(settings.getCheckpointInterval(), 
rootNode.get("interval").asLong());
+               Assert.assertEquals(settings.getCheckpointTimeout(), 
rootNode.get("timeout").asLong());
+               Assert.assertEquals(settings.getMinPauseBetweenCheckpoints(), 
rootNode.get("min_pause").asLong());
+               Assert.assertEquals(settings.getMaxConcurrentCheckpoints(), 
rootNode.get("max_concurrent").asInt());
+
+               JsonNode externalizedNode = rootNode.get("externalization");
+               Assert.assertNotNull(externalizedNode);
+               
Assert.assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
+               
Assert.assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
+
+       }
+
+       @Test
        public void testGetPaths() {
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
@@ -50,26 +87,10 @@ public class CheckpointConfigHandlerTest {
         */
        @Test
        public void testSimpleConfig() throws Exception {
-               long interval = 18231823L;
-               long timeout = 996979L;
-               long minPause = 119191919L;
-               int maxConcurrent = 12929329;
-               ExternalizedCheckpointSettings externalized = 
ExternalizedCheckpointSettings.none();
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, true);
 
-               JobSnapshottingSettings settings = new JobSnapshottingSettings(
-                       Collections.<JobVertexID>emptyList(),
-                       Collections.<JobVertexID>emptyList(),
-                       Collections.<JobVertexID>emptyList(),
-                       interval,
-                       timeout,
-                       minPause,
-                       maxConcurrent,
-                       externalized,
-                       null,
-                       true);
-
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+               AccessExecutionGraph graph = graphAndSettings.graph;
+               JobSnapshottingSettings settings = 
graphAndSettings.snapshottingSettings;
 
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
@@ -78,10 +99,10 @@ public class CheckpointConfigHandlerTest {
                JsonNode rootNode = mapper.readTree(json);
 
                assertEquals("exactly_once", rootNode.get("mode").asText());
-               assertEquals(interval, rootNode.get("interval").asLong());
-               assertEquals(timeout, rootNode.get("timeout").asLong());
-               assertEquals(minPause, rootNode.get("min_pause").asLong());
-               assertEquals(maxConcurrent, 
rootNode.get("max_concurrent").asInt());
+               assertEquals(settings.getCheckpointInterval(), 
rootNode.get("interval").asLong());
+               assertEquals(settings.getCheckpointTimeout(), 
rootNode.get("timeout").asLong());
+               assertEquals(settings.getMinPauseBetweenCheckpoints(), 
rootNode.get("min_pause").asLong());
+               assertEquals(settings.getMaxConcurrentCheckpoints(), 
rootNode.get("max_concurrent").asInt());
 
                JsonNode externalizedNode = rootNode.get("externalization");
                assertNotNull(externalizedNode);
@@ -93,20 +114,9 @@ public class CheckpointConfigHandlerTest {
         */
        @Test
        public void testAtLeastOnce() throws Exception {
-               JobSnapshottingSettings settings = new JobSnapshottingSettings(
-                       Collections.<JobVertexID>emptyList(),
-                       Collections.<JobVertexID>emptyList(),
-                       Collections.<JobVertexID>emptyList(),
-                       996979L,
-                       1818L,
-                       1212L,
-                       12,
-                       ExternalizedCheckpointSettings.none(),
-                       null,
-                       false); // at least once
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(false, false);
 
-               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
-               when(graph.getJobSnapshottingSettings()).thenReturn(settings);
+               AccessExecutionGraph graph = graphAndSettings.graph;
 
                CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
                String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
@@ -122,30 +132,60 @@ public class CheckpointConfigHandlerTest {
         */
        @Test
        public void testEnabledExternalizedCheckpointSettings() throws 
Exception {
-               ExternalizedCheckpointSettings externalizedSettings = 
ExternalizedCheckpointSettings.externalizeCheckpoints(true);
+               GraphAndSettings graphAndSettings = 
createGraphAndSettings(true, false);
+
+               AccessExecutionGraph graph = graphAndSettings.graph;
+               ExternalizedCheckpointSettings externalizedSettings = 
graphAndSettings.externalizedSettings;
+
+               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
+               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode externalizedNode = 
mapper.readTree(json).get("externalization");
+               assertNotNull(externalizedNode);
+               assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
+               assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
+       }
+
+       private static GraphAndSettings createGraphAndSettings(boolean 
externalized, boolean exactlyOnce) {
+               long interval = 18231823L;
+               long timeout = 996979L;
+               long minPause = 119191919L;
+               int maxConcurrent = 12929329;
+               ExternalizedCheckpointSettings externalizedSetting = 
externalized
+                       ? 
ExternalizedCheckpointSettings.externalizeCheckpoints(true)
+                       : ExternalizedCheckpointSettings.none();
 
                JobSnapshottingSettings settings = new JobSnapshottingSettings(
                        Collections.<JobVertexID>emptyList(),
                        Collections.<JobVertexID>emptyList(),
                        Collections.<JobVertexID>emptyList(),
-                       996979L,
-                       1818L,
-                       1212L,
-                       12,
-                       externalizedSettings,
+                       interval,
+                       timeout,
+                       minPause,
+                       maxConcurrent,
+                       externalizedSetting,
                        null,
-                       false); // at least once
+                       exactlyOnce);
 
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
                when(graph.getJobSnapshottingSettings()).thenReturn(settings);
 
-               CheckpointConfigHandler handler = new 
CheckpointConfigHandler(mock(ExecutionGraphHolder.class));
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
+               return new GraphAndSettings(graph, settings, 
externalizedSetting);
+       }
 
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode externalizedNode = 
mapper.readTree(json).get("externalization");
-               assertNotNull(externalizedNode);
-               assertEquals(externalizedSettings.externalizeCheckpoints(), 
externalizedNode.get("enabled").asBoolean());
-               assertEquals(externalizedSettings.deleteOnCancellation(), 
externalizedNode.get("delete_on_cancellation").asBoolean());
+       private static class GraphAndSettings {
+               public final AccessExecutionGraph graph;
+               public final JobSnapshottingSettings snapshottingSettings;
+               public final ExternalizedCheckpointSettings 
externalizedSettings;
+
+               public GraphAndSettings(
+                               AccessExecutionGraph graph,
+                               JobSnapshottingSettings snapshottingSettings,
+                               ExternalizedCheckpointSettings 
externalizedSettings) {
+                       this.graph = graph;
+                       this.snapshottingSettings = snapshottingSettings;
+                       this.externalizedSettings = externalizedSettings;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
index ca9b606..770b032 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
@@ -32,12 +33,19 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
@@ -50,6 +58,44 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
 public class CheckpointStatsDetailsHandlerTest {
+       
+       @Test
+       public void testArchiver() throws IOException {
+               JsonArchivist archivist = new 
CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+
+               CompletedCheckpointStats completedCheckpoint = 
createCompletedCheckpoint();
+               FailedCheckpointStats failedCheckpoint = 
createFailedCheckpoint();
+               List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
+               checkpoints.add(failedCheckpoint);
+               checkpoints.add(completedCheckpoint);
+               
+               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
+               when(history.getCheckpoints()).thenReturn(checkpoints);
+               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
+               when(snapshot.getHistory()).thenReturn(history);
+
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+               when(graph.getJobID()).thenReturn(new JobID());
+
+               ObjectMapper mapper = new ObjectMapper();
+               
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
+               Assert.assertEquals(2, archives.size());
+               
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
failedCheckpoint.getCheckpointId(),
+                       archive1.getPath());
+               compareFailedCheckpoint(failedCheckpoint, 
mapper.readTree(archive1.getJson()));
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
completedCheckpoint.getCheckpointId(),
+                       archive2.getPath());
+               compareCompletedCheckpoint(completedCheckpoint, 
mapper.readTree(archive2.getJson()));
+       }
 
        @Test
        public void testGetPaths() {
@@ -146,8 +192,7 @@ public class CheckpointStatsDetailsHandlerTest {
                assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
                assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
 
-               verifyTaskNode(task1, rootNode);
-               verifyTaskNode(task2, rootNode);
+               verifyTaskNodes(taskStats, rootNode);
        }
 
        /**
@@ -155,6 +200,32 @@ public class CheckpointStatsDetailsHandlerTest {
         */
        @Test
        public void testCheckpointDetailsRequestCompletedCheckpoint() throws 
Exception {
+               CompletedCheckpointStats checkpoint = 
createCompletedCheckpoint();
+
+               JsonNode rootNode = triggerRequest(checkpoint);
+
+               compareCompletedCheckpoint(checkpoint, rootNode);
+
+               verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+       }
+
+       /**
+        * Tests a checkpoint details request for a failed checkpoint.
+        */
+       @Test
+       public void testCheckpointDetailsRequestFailedCheckpoint() throws 
Exception {
+               FailedCheckpointStats checkpoint = createFailedCheckpoint();
+
+               JsonNode rootNode = triggerRequest(checkpoint);
+
+               compareFailedCheckpoint(checkpoint, rootNode);
+
+               verifyTaskNodes(checkpoint.getAllTaskStateStats(), rootNode);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       private static CompletedCheckpointStats createCompletedCheckpoint() {
                CompletedCheckpointStats checkpoint = 
mock(CompletedCheckpointStats.class);
                when(checkpoint.getCheckpointId()).thenReturn(1818213L);
                
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.COMPLETED);
@@ -177,8 +248,10 @@ public class CheckpointStatsDetailsHandlerTest {
 
                when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
 
-               JsonNode rootNode = triggerRequest(checkpoint);
+               return checkpoint;
+       }
 
+       private static void compareCompletedCheckpoint(CompletedCheckpointStats 
checkpoint, JsonNode rootNode) {
                assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
                assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
                assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
@@ -191,18 +264,11 @@ public class CheckpointStatsDetailsHandlerTest {
                assertEquals(checkpoint.getExternalPath(), 
rootNode.get("external_path").asText());
                assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
                assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
-
-               verifyTaskNode(task1, rootNode);
-               verifyTaskNode(task2, rootNode);
        }
 
-       /**
-        * Tests a checkpoint details request for a failed checkpoint.
-        */
-       @Test
-       public void testCheckpointDetailsRequestFailedCheckpoint() throws 
Exception {
+       private static FailedCheckpointStats createFailedCheckpoint() {
                FailedCheckpointStats checkpoint = 
mock(FailedCheckpointStats.class);
-               when(checkpoint.getCheckpointId()).thenReturn(1818213L);
+               when(checkpoint.getCheckpointId()).thenReturn(1818214L);
                
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.FAILED);
                
when(checkpoint.getProperties()).thenReturn(CheckpointProperties.forStandardSavepoint());
                when(checkpoint.getTriggerTimestamp()).thenReturn(1818L);
@@ -223,8 +289,10 @@ public class CheckpointStatsDetailsHandlerTest {
 
                when(checkpoint.getAllTaskStateStats()).thenReturn(taskStats);
 
-               JsonNode rootNode = triggerRequest(checkpoint);
+               return checkpoint;
+       }
 
+       private static void compareFailedCheckpoint(FailedCheckpointStats 
checkpoint, JsonNode rootNode) {
                assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
                assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
                assertEquals(checkpoint.getProperties().isSavepoint(), 
rootNode.get("is_savepoint").asBoolean());
@@ -237,13 +305,8 @@ public class CheckpointStatsDetailsHandlerTest {
                assertEquals(checkpoint.getFailureMessage(), 
rootNode.get("failure_message").asText());
                assertEquals(checkpoint.getNumberOfSubtasks(), 
rootNode.get("num_subtasks").asInt());
                assertEquals(checkpoint.getNumberOfAcknowledgedSubtasks(), 
rootNode.get("num_acknowledged_subtasks").asInt());
-
-               verifyTaskNode(task1, rootNode);
-               verifyTaskNode(task2, rootNode);
        }
 
-       // 
------------------------------------------------------------------------
-
        private static JsonNode triggerRequest(AbstractCheckpointStats 
checkpoint) throws Exception {
                CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
                
when(history.getCheckpointById(anyLong())).thenReturn(checkpoint);
@@ -262,16 +325,18 @@ public class CheckpointStatsDetailsHandlerTest {
                return mapper.readTree(json);
        }
 
-       private static void verifyTaskNode(TaskStateStats task, JsonNode 
parentNode) {
-               long duration = ThreadLocalRandom.current().nextInt(128);
-
-               JsonNode taskNode = 
parentNode.get("tasks").get(task.getJobVertexId().toString());
-               assertEquals(task.getLatestAckTimestamp(), 
taskNode.get("latest_ack_timestamp").asLong());
-               assertEquals(task.getStateSize(), 
taskNode.get("state_size").asLong());
-               
assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), 
taskNode.get("end_to_end_duration").asLong());
-               assertEquals(task.getAlignmentBuffered(), 
taskNode.get("alignment_buffered").asLong());
-               assertEquals(task.getNumberOfSubtasks(), 
taskNode.get("num_subtasks").asInt());
-               assertEquals(task.getNumberOfAcknowledgedSubtasks(), 
taskNode.get("num_acknowledged_subtasks").asInt());
+       private static void verifyTaskNodes(Collection<TaskStateStats> tasks, 
JsonNode parentNode) {
+               for (TaskStateStats task : tasks) {
+                       long duration = 
ThreadLocalRandom.current().nextInt(128);
+
+                       JsonNode taskNode = 
parentNode.get("tasks").get(task.getJobVertexId().toString());
+                       assertEquals(task.getLatestAckTimestamp(), 
taskNode.get("latest_ack_timestamp").asLong());
+                       assertEquals(task.getStateSize(), 
taskNode.get("state_size").asLong());
+                       
assertEquals(task.getEndToEndDuration(task.getLatestAckTimestamp() - duration), 
taskNode.get("end_to_end_duration").asLong());
+                       assertEquals(task.getAlignmentBuffered(), 
taskNode.get("alignment_buffered").asLong());
+                       assertEquals(task.getNumberOfSubtasks(), 
taskNode.get("num_subtasks").asInt());
+                       assertEquals(task.getNumberOfAcknowledgedSubtasks(), 
taskNode.get("num_acknowledged_subtasks").asInt());
+               }
        }
 
        private static TaskStateStats createTaskStateStats() {

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
index ab7c7a3..1e4a255 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandlerTest.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointProperties;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsCounts;
@@ -34,10 +35,15 @@ import 
org.apache.flink.runtime.checkpoint.PendingCheckpointStats;
 import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -51,6 +57,32 @@ import static org.mockito.Mockito.when;
 public class CheckpointStatsHandlerTest {
 
        @Test
+       public void testArchiver() throws IOException {
+               JsonArchivist archivist = new 
CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist();
+               TestCheckpointStats testCheckpointStats = 
createTestCheckpointStats();
+               when(testCheckpointStats.graph.getJobID()).thenReturn(new 
JobID());
+               
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(testCheckpointStats.graph);
+               Assert.assertEquals(3, archives.size());
+
+               ObjectMapper mapper = new ObjectMapper();
+               
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals("/jobs/" + 
testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + 
testCheckpointStats.inProgress.getCheckpointId(), archive1.getPath());
+               compareInProgressCheckpoint(testCheckpointStats.inProgress, 
mapper.readTree(archive1.getJson()));
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals("/jobs/" + 
testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + 
testCheckpointStats.completedSavepoint.getCheckpointId(), archive2.getPath());
+               
compareCompletedSavepoint(testCheckpointStats.completedSavepoint, 
mapper.readTree(archive2.getJson()));
+               
+               ArchivedJson archive3 = iterator.next();
+               Assert.assertEquals("/jobs/" + 
testCheckpointStats.graph.getJobID() + "/checkpoints/details/" + 
testCheckpointStats.failed.getCheckpointId(), archive3.getPath());
+               compareFailedCheckpoint(testCheckpointStats.failed, 
mapper.readTree(archive3.getJson()));
+       }
+       
+
+       @Test
        public void testGetPaths() {
                CheckpointStatsHandler handler = new 
CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
                String[] paths = handler.getPaths();
@@ -63,6 +95,18 @@ public class CheckpointStatsHandlerTest {
         */
        @Test
        public void testCheckpointStatsRequest() throws Exception {
+               TestCheckpointStats testCheckpointStats = 
createTestCheckpointStats();
+
+               CheckpointStatsHandler handler = new 
CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
+               String json = handler.handleRequest(testCheckpointStats.graph, 
Collections.<String, String>emptyMap());
+
+               ObjectMapper mapper = new ObjectMapper();
+               JsonNode rootNode = mapper.readTree(json);
+
+               compareCheckpointStats(testCheckpointStats, rootNode);
+       }
+
+       private static TestCheckpointStats createTestCheckpointStats() {
                // Counts
                CheckpointStatsCounts counts = 
mock(CheckpointStatsCounts.class);
                
when(counts.getNumberOfRestoredCheckpoints()).thenReturn(123123123L);
@@ -104,7 +148,7 @@ public class CheckpointStatsHandlerTest {
                
when(latestCompleted.getExternalPath()).thenReturn("latest-completed-external-path");
 
                CompletedCheckpointStats latestSavepoint = 
mock(CompletedCheckpointStats.class);
-               when(latestSavepoint.getCheckpointId()).thenReturn(1992139L);
+               when(latestSavepoint.getCheckpointId()).thenReturn(1992140L);
                
when(latestSavepoint.getTriggerTimestamp()).thenReturn(1919191900L);
                
when(latestSavepoint.getLatestAckTimestamp()).thenReturn(1977791901L);
                when(latestSavepoint.getStateSize()).thenReturn(111939272822L);
@@ -133,7 +177,7 @@ public class CheckpointStatsHandlerTest {
                List<AbstractCheckpointStats> checkpoints = new ArrayList<>();
 
                PendingCheckpointStats inProgress = 
mock(PendingCheckpointStats.class);
-               when(inProgress.getCheckpointId()).thenReturn(1992139L);
+               when(inProgress.getCheckpointId()).thenReturn(1992141L);
                
when(inProgress.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
                
when(inProgress.getProperties()).thenReturn(CheckpointProperties.forStandardCheckpoint());
                when(inProgress.getTriggerTimestamp()).thenReturn(1919191900L);
@@ -189,12 +233,15 @@ public class CheckpointStatsHandlerTest {
                AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
                when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
 
-               CheckpointStatsHandler handler = new 
CheckpointStatsHandler(mock(ExecutionGraphHolder.class));
-               String json = handler.handleRequest(graph, Collections.<String, 
String>emptyMap());
-
-               ObjectMapper mapper = new ObjectMapper();
-               JsonNode rootNode = mapper.readTree(json);
+               return new TestCheckpointStats(
+                       graph, counts, stateSizeSummary, durationSummary, 
alignmentBufferedSummary, summary,
+                       latestCompleted, latestSavepoint, latestFailed, 
latestRestored, inProgress, 
+                       completedSavepoint, failed, history, snapshot
+               );
+       }
 
+       private static void compareCheckpointStats(TestCheckpointStats 
checkpointStats, JsonNode rootNode) {
+               CheckpointStatsCounts counts = checkpointStats.counts;
                JsonNode countNode = rootNode.get("counts");
                assertEquals(counts.getNumberOfRestoredCheckpoints(), 
countNode.get("restored").asLong());
                assertEquals(counts.getTotalNumberOfCheckpoints(), 
countNode.get("total").asLong());
@@ -202,22 +249,26 @@ public class CheckpointStatsHandlerTest {
                assertEquals(counts.getNumberOfCompletedCheckpoints(), 
countNode.get("completed").asLong());
                assertEquals(counts.getNumberOfFailedCheckpoints(), 
countNode.get("failed").asLong());
 
+               MinMaxAvgStats stateSizeSummary = 
checkpointStats.stateSizeSummary;
                JsonNode summaryNode = rootNode.get("summary");
                JsonNode sizeSummaryNode = summaryNode.get("state_size");
                assertEquals(stateSizeSummary.getMinimum(), 
sizeSummaryNode.get("min").asLong());
                assertEquals(stateSizeSummary.getMaximum(), 
sizeSummaryNode.get("max").asLong());
                assertEquals(stateSizeSummary.getAverage(), 
sizeSummaryNode.get("avg").asLong());
 
+               MinMaxAvgStats durationSummary = 
checkpointStats.durationSummary;
                JsonNode durationSummaryNode = 
summaryNode.get("end_to_end_duration");
                assertEquals(durationSummary.getMinimum(), 
durationSummaryNode.get("min").asLong());
                assertEquals(durationSummary.getMaximum(), 
durationSummaryNode.get("max").asLong());
                assertEquals(durationSummary.getAverage(), 
durationSummaryNode.get("avg").asLong());
 
+               MinMaxAvgStats alignmentBufferedSummary = 
checkpointStats.alignmentBufferedSummary;
                JsonNode alignmentBufferedNode = 
summaryNode.get("alignment_buffered");
                assertEquals(alignmentBufferedSummary.getMinimum(), 
alignmentBufferedNode.get("min").asLong());
                assertEquals(alignmentBufferedSummary.getMaximum(), 
alignmentBufferedNode.get("max").asLong());
                assertEquals(alignmentBufferedSummary.getAverage(), 
alignmentBufferedNode.get("avg").asLong());
 
+               CompletedCheckpointStats latestCompleted = 
checkpointStats.latestCompleted;
                JsonNode latestNode = rootNode.get("latest");
                JsonNode latestCheckpointNode = latestNode.get("completed");
                assertEquals(latestCompleted.getCheckpointId(), 
latestCheckpointNode.get("id").asLong());
@@ -228,6 +279,7 @@ public class CheckpointStatsHandlerTest {
                assertEquals(latestCompleted.getAlignmentBuffered(), 
latestCheckpointNode.get("alignment_buffered").asLong());
                assertEquals(latestCompleted.getExternalPath(), 
latestCheckpointNode.get("external_path").asText());
 
+               CompletedCheckpointStats latestSavepoint = 
checkpointStats.latestSavepoint;
                JsonNode latestSavepointNode = latestNode.get("savepoint");
                assertEquals(latestSavepoint.getCheckpointId(), 
latestSavepointNode.get("id").asLong());
                assertEquals(latestSavepoint.getTriggerTimestamp(), 
latestSavepointNode.get("trigger_timestamp").asLong());
@@ -237,6 +289,7 @@ public class CheckpointStatsHandlerTest {
                assertEquals(latestSavepoint.getAlignmentBuffered(), 
latestSavepointNode.get("alignment_buffered").asLong());
                assertEquals(latestSavepoint.getExternalPath(), 
latestSavepointNode.get("external_path").asText());
 
+               FailedCheckpointStats latestFailed = 
checkpointStats.latestFailed;
                JsonNode latestFailedNode = latestNode.get("failed");
                assertEquals(latestFailed.getCheckpointId(), 
latestFailedNode.get("id").asLong());
                assertEquals(latestFailed.getTriggerTimestamp(), 
latestFailedNode.get("trigger_timestamp").asLong());
@@ -247,6 +300,7 @@ public class CheckpointStatsHandlerTest {
                assertEquals(latestFailed.getFailureTimestamp(), 
latestFailedNode.get("failure_timestamp").asLong());
                assertEquals(latestFailed.getFailureMessage(), 
latestFailedNode.get("failure_message").asText());
 
+               RestoredCheckpointStats latestRestored = 
checkpointStats.latestRestored;
                JsonNode latestRestoredNode = latestNode.get("restored");
                assertEquals(latestRestored.getCheckpointId(), 
latestRestoredNode.get("id").asLong());
                assertEquals(latestRestored.getRestoreTimestamp(), 
latestRestoredNode.get("restore_timestamp").asLong());
@@ -259,6 +313,25 @@ public class CheckpointStatsHandlerTest {
                assertTrue(it.hasNext());
                JsonNode inProgressNode = it.next();
 
+               PendingCheckpointStats inProgress = checkpointStats.inProgress;
+               compareInProgressCheckpoint(inProgress, inProgressNode);
+
+               assertTrue(it.hasNext());
+               JsonNode completedSavepointNode = it.next();
+
+               CompletedCheckpointStats completedSavepoint = 
checkpointStats.completedSavepoint;
+               compareCompletedSavepoint(completedSavepoint, 
completedSavepointNode);
+
+               assertTrue(it.hasNext());
+               JsonNode failedNode = it.next();
+
+               FailedCheckpointStats failed = checkpointStats.failed;
+               compareFailedCheckpoint(failed, failedNode);
+
+               assertFalse(it.hasNext());
+       }
+
+       private static void compareInProgressCheckpoint(PendingCheckpointStats 
inProgress, JsonNode inProgressNode) {
                assertEquals(inProgress.getCheckpointId(), 
inProgressNode.get("id").asLong());
                assertEquals(inProgress.getStatus().toString(), 
inProgressNode.get("status").asText());
                assertEquals(inProgress.getProperties().isSavepoint(), 
inProgressNode.get("is_savepoint").asBoolean());
@@ -269,10 +342,9 @@ public class CheckpointStatsHandlerTest {
                assertEquals(inProgress.getAlignmentBuffered(), 
inProgressNode.get("alignment_buffered").asLong());
                assertEquals(inProgress.getNumberOfSubtasks(), 
inProgressNode.get("num_subtasks").asInt());
                assertEquals(inProgress.getNumberOfAcknowledgedSubtasks(), 
inProgressNode.get("num_acknowledged_subtasks").asInt());
+       }
 
-               assertTrue(it.hasNext());
-               JsonNode completedSavepointNode = it.next();
-
+       private static void compareCompletedSavepoint(CompletedCheckpointStats 
completedSavepoint, JsonNode completedSavepointNode) {
                assertEquals(completedSavepoint.getCheckpointId(), 
completedSavepointNode.get("id").asLong());
                assertEquals(completedSavepoint.getStatus().toString(), 
completedSavepointNode.get("status").asText());
                assertEquals(completedSavepoint.getProperties().isSavepoint(), 
completedSavepointNode.get("is_savepoint").asBoolean());
@@ -286,10 +358,9 @@ public class CheckpointStatsHandlerTest {
 
                assertEquals(completedSavepoint.getExternalPath(), 
completedSavepointNode.get("external_path").asText());
                assertEquals(completedSavepoint.isDiscarded(), 
completedSavepointNode.get("discarded").asBoolean());
+       }
 
-               assertTrue(it.hasNext());
-               JsonNode failedNode = it.next();
-
+       private static void compareFailedCheckpoint(FailedCheckpointStats 
failed, JsonNode failedNode) {
                assertEquals(failed.getCheckpointId(), 
failedNode.get("id").asLong());
                assertEquals(failed.getStatus().toString(), 
failedNode.get("status").asText());
                assertEquals(failed.getProperties().isSavepoint(), 
failedNode.get("is_savepoint").asBoolean());
@@ -303,7 +374,56 @@ public class CheckpointStatsHandlerTest {
 
                assertEquals(failed.getFailureTimestamp(), 
failedNode.get("failure_timestamp").asLong());
                assertEquals(failed.getFailureMessage(), 
failedNode.get("failure_message").asText());
-
-               assertFalse(it.hasNext());
+       }
+       
+       private static class TestCheckpointStats {
+               public final AccessExecutionGraph graph;
+               public final CheckpointStatsCounts counts;
+               public final MinMaxAvgStats stateSizeSummary;
+               public final MinMaxAvgStats durationSummary;
+               public final MinMaxAvgStats alignmentBufferedSummary;
+               public final CompletedCheckpointStatsSummary summary;
+               public final CompletedCheckpointStats latestCompleted;
+               public final CompletedCheckpointStats latestSavepoint;
+               public final FailedCheckpointStats latestFailed;
+               public final RestoredCheckpointStats latestRestored;
+               public final PendingCheckpointStats inProgress;
+               public final CompletedCheckpointStats completedSavepoint;
+               public final FailedCheckpointStats failed;
+               public final CheckpointStatsHistory history;
+               public final CheckpointStatsSnapshot snapshot;
+
+               public TestCheckpointStats(
+                               AccessExecutionGraph graph,
+                               CheckpointStatsCounts counts,
+                               MinMaxAvgStats stateSizeSummary,
+                               MinMaxAvgStats durationSummary,
+                               MinMaxAvgStats alignmentBufferedSummary,
+                               CompletedCheckpointStatsSummary summary,
+                               CompletedCheckpointStats latestCompleted,
+                               CompletedCheckpointStats latestSavepoint,
+                               FailedCheckpointStats latestFailed,
+                               RestoredCheckpointStats latestRestored,
+                               PendingCheckpointStats inProgress,
+                               CompletedCheckpointStats completedSavepoint,
+                               FailedCheckpointStats failed,
+                               CheckpointStatsHistory history,
+                               CheckpointStatsSnapshot snapshot) {
+                       this.graph = graph;
+                       this.counts = counts;
+                       this.stateSizeSummary = stateSizeSummary;
+                       this.durationSummary = durationSummary;
+                       this.alignmentBufferedSummary = 
alignmentBufferedSummary;
+                       this.summary = summary;
+                       this.latestCompleted = latestCompleted;
+                       this.latestSavepoint = latestSavepoint;
+                       this.latestFailed = latestFailed;
+                       this.latestRestored = latestRestored;
+                       this.inProgress = inProgress;
+                       this.completedSavepoint = completedSavepoint;
+                       this.failed = failed;
+                       this.history = history;
+                       this.snapshot = snapshot;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
index 26433fa..bbab621 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
@@ -31,9 +32,13 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -55,6 +60,42 @@ import static org.mockito.Mockito.when;
 public class CheckpointStatsSubtaskDetailsHandlerTest {
 
        @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist();
+               ObjectMapper mapper = new ObjectMapper();
+
+               PendingCheckpointStats checkpoint = 
mock(PendingCheckpointStats.class);
+               when(checkpoint.getCheckpointId()).thenReturn(1992139L);
+               
when(checkpoint.getStatus()).thenReturn(CheckpointStatsStatus.IN_PROGRESS);
+               when(checkpoint.getTriggerTimestamp()).thenReturn(0L); // ack 
timestamp = duration
+
+               TaskStateStats task = createTaskStateStats(1237);
+               
when(checkpoint.getAllTaskStateStats()).thenReturn(Collections.singletonList(task));
+
+               CheckpointStatsHistory history = 
mock(CheckpointStatsHistory.class);
+               
when(history.getCheckpoints()).thenReturn(Collections.<AbstractCheckpointStats>singletonList(checkpoint));
+               CheckpointStatsSnapshot snapshot = 
mock(CheckpointStatsSnapshot.class);
+               when(snapshot.getHistory()).thenReturn(history);
+
+               AccessExecutionGraph graph = mock(AccessExecutionGraph.class);
+               when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot);
+               when(graph.getJobID()).thenReturn(new JobID());
+               
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(graph);
+               Assert.assertEquals(1, archives.size());
+               
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals(
+                       "/jobs/" + graph.getJobID() + "/checkpoints/details/" + 
checkpoint.getCheckpointId() + "/subtasks/" + task.getJobVertexId(),
+                       archive.getPath());
+               JsonNode rootNode = mapper.readTree(archive.getJson());
+               assertEquals(checkpoint.getCheckpointId(), 
rootNode.get("id").asLong());
+               assertEquals(checkpoint.getStatus().toString(), 
rootNode.get("status").asText());
+
+               verifyTaskNode(rootNode, task, 
checkpoint.getTriggerTimestamp());
+       }
+
+       @Test
        public void testGetPaths() {
                CheckpointStatsDetailsSubtasksHandler handler = new 
CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), new 
CheckpointStatsCache(0));
                String[] paths = handler.getPaths();

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
index 0340d87..ed339ed 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -105,7 +105,7 @@ public class ArchivedJobGenerationUtils {
                originalAttempt = new ArchivedExecutionBuilder()
                        .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9})
                        .setParallelSubtaskIndex(1)
-                       .setAttemptNumber(3)
+                       .setAttemptNumber(0)
                        .setAssignedResourceLocation(location)
                        .setUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
                        .setState(ExecutionState.FINISHED)

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
new file mode 100644
index 0000000..22e011c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/ArchivedJson.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A simple container for a handler's JSON response and the REST URLs for 
which the response would've been returned.
+ * 
+ * These are created by {@link JsonArchivist}s, and used by the {@link 
MemoryArchivist} to create a directory structure
+ * resembling the REST API.
+ */
+public class ArchivedJson {
+       private final String path;
+       private final String json;
+       
+       public ArchivedJson(String path, String json) {
+               this.path = Preconditions.checkNotNull(path);
+               this.json = Preconditions.checkNotNull(json);
+       }
+
+       public String getPath() {
+               return path;
+       }
+
+       public String getJson() {
+               return json;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
new file mode 100644
index 0000000..a87cc47
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/history/JsonArchivist.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.history;
+
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Interface for all classes that want to participate in the archiving of 
job-related json responses.
+ * 
+ * Note that all JsonArchivists that are to be used for the history server 
must be added
+ * to {@link WebRuntimeMonitor#getArchivers()}.
+ */
+public interface JsonArchivist {
+
+       /**
+        * Returns a {@link Collection} of {@link ArchivedJson}s containing 
JSON responses and their respective REST URL
+        * for a given job.
+        *
+        * The collection should contain one entry for every response that 
could be generated for the given
+        * job, for example one entry for each task. The REST URLs should be 
unique and must not contain placeholders.
+        *
+        * @param graph AccessExecutionGraph for which the responses should be 
generated
+        *
+        * @return Collection containing an ArchivedJson for every response 
that could be generated for the given job
+        * @throws IOException thrown if the JSON generation fails
+        */
+       Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph 
graph) throws IOException;
+}

Reply via email to