[FLINK-5941] Integrate Archiver pattern into handlers

This closes #3444.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fe0eb47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fe0eb47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fe0eb47

Branch: refs/heads/master
Commit: 7fe0eb477df52cfd7254695a67d41f3cba34ef0a
Parents: 243ef69
Author: zentol <[email protected]>
Authored: Mon Feb 20 16:30:35 2017 +0100
Committer: zentol <[email protected]>
Committed: Thu Mar 2 18:27:15 2017 +0100

----------------------------------------------------------------------
 .../runtime/webmonitor/WebRuntimeMonitor.java   |  42 ++++++
 .../handlers/CurrentJobsOverviewHandler.java    |  26 ++++
 .../handlers/JobAccumulatorsHandler.java        |  15 ++
 .../webmonitor/handlers/JobConfigHandler.java   |  16 ++
 .../webmonitor/handlers/JobDetailsHandler.java  |  20 +++
 .../handlers/JobExceptionsHandler.java          |  15 ++
 .../webmonitor/handlers/JobPlanHandler.java     |  16 ++
 .../handlers/JobVertexAccumulatorsHandler.java  |  22 +++
 .../handlers/JobVertexDetailsHandler.java       |  22 +++
 .../handlers/JobVertexTaskManagersHandler.java  |  20 +++
 ...taskExecutionAttemptAccumulatorsHandler.java |  40 +++++
 .../SubtaskExecutionAttemptDetailsHandler.java  |  47 ++++++
 .../SubtasksAllAccumulatorsHandler.java         |  22 +++
 .../handlers/SubtasksTimesHandler.java          |  22 +++
 .../checkpoints/CheckpointConfigHandler.java    |  15 ++
 .../CheckpointStatsDetailsHandler.java          |  28 ++++
 .../CheckpointStatsDetailsSubtasksHandler.java  |  31 ++++
 .../checkpoints/CheckpointStatsHandler.java     |  15 ++
 .../CurrentJobsOverviewHandlerTest.java         |  32 +++-
 .../handlers/JobAccumulatorsHandlerTest.java    |  23 +++
 .../handlers/JobConfigHandlerTest.java          |  21 +++
 .../handlers/JobDetailsHandlerTest.java         |  28 ++++
 .../handlers/JobExceptionsHandlerTest.java      |  23 +++
 .../webmonitor/handlers/JobPlanHandlerTest.java |  20 +++
 .../JobVertexAccumulatorsHandlerTest.java       |  25 ++++
 .../handlers/JobVertexDetailsHandlerTest.java   |  25 ++++
 .../JobVertexTaskManagersHandlerTest.java       |  26 ++++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  33 ++++
 ...btaskExecutionAttemptDetailsHandlerTest.java |  40 +++++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  25 ++++
 .../handlers/SubtasksTimesHandlerTest.java      |  26 ++++
 .../CheckpointConfigHandlerTest.java            | 140 ++++++++++-------
 .../CheckpointStatsDetailsHandlerTest.java      | 121 +++++++++++----
 .../checkpoints/CheckpointStatsHandlerTest.java | 150 +++++++++++++++++--
 ...heckpointStatsSubtaskDetailsHandlerTest.java |  41 +++++
 .../utils/ArchivedJobGenerationUtils.java       |   2 +-
 .../webmonitor/history/ArchivedJson.java        |  45 ++++++
 .../webmonitor/history/JsonArchivist.java       |  46 ++++++
 38 files changed, 1231 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
index dddc69d..e604ce8 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java
@@ -37,6 +37,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.jobmanager.MemoryArchivist;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.net.SSLUtils;
 import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler;
@@ -77,6 +78,7 @@ import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsC
 import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler;
 import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler;
@@ -424,6 +426,46 @@ public class WebRuntimeMonitor implements WebMonitor {
                LOG.info("Web frontend listening at " + address + ':' + port);
        }
 
+       /**
+        * Returns an array of all {@link JsonArchivist}s that are relevant for 
the history server.
+        * 
+        * This method is static to allow easier access from the {@link 
MemoryArchivist}. Requiring a reference
+        * would imply that the WebRuntimeMonitor is always created before the 
archivist, which may not hold for all
+        * deployment modes.
+        * 
+        * Similarly, no handler implements the JsonArchivist interface itself 
but instead contains a separate implementing
+        * class; otherwise we would either instantiate several handlers even 
though their main functionality isn't
+        * required, or yet again require that the WebRuntimeMonitor is started 
before the archivist.
+        * 
+        * @return array of all JsonArchivists relevant for the history server
+        */
+       public static JsonArchivist[] getArchivers() {
+               JsonArchivist[] archivists = new JsonArchivist[]{
+                       new 
CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(),
+
+                       new JobPlanHandler.JobPlanJsonArchivist(),
+                       new JobConfigHandler.JobConfigJsonArchivist(),
+                       new JobExceptionsHandler.JobExceptionsJsonArchivist(),
+                       new JobDetailsHandler.JobDetailsJsonArchivist(),
+                       new 
JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(),
+
+                       new 
CheckpointStatsHandler.CheckpointStatsJsonArchivist(),
+                       new 
CheckpointConfigHandler.CheckpointConfigJsonArchivist(),
+                       new 
CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(),
+                       new 
CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(),
+                               
+                       new 
JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(),
+                       new SubtasksTimesHandler.SubtasksTimesJsonArchivist(),
+                       new 
JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(),
+                       new 
JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(),
+                       new 
SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(),
+                       
+                       new 
SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(),
+                       new 
SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist()
+                       };
+               return archivists;
+       }
+
        @Override
        public void start(String jobManagerAkkaUrl) throws Exception {
                LOG.info("Starting with JobManager {} on port {}", 
jobManagerAkkaUrl, getServerPort());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 00cf138..60a2b27 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -20,16 +20,22 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -121,6 +127,26 @@ public class CurrentJobsOverviewHandler extends 
AbstractJsonRequestHandler {
                }
        }
 
+       public static class CurrentJobsOverviewJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       StringWriter writer = new StringWriter();
+                       try (JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer)) {
+                               gen.writeStartObject();
+                               gen.writeArrayFieldStart("running");
+                               gen.writeEndArray();
+                               gen.writeArrayFieldStart("finished");
+                               
writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, 
System.currentTimeMillis());
+                               gen.writeEndArray();
+                               gen.writeEndObject();
+                       }
+                       String json = writer.toString();
+                       String path = ALL_JOBS_REST_PATH;
+                       return Collections.singleton(new ArchivedJson(path, 
json));
+               }
+       }
+
        public static void writeJobDetailOverviewAsJson(JobDetails details, 
JsonGenerator gen, long now) throws IOException {
                gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index dfc654e..c403aa2 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -22,9 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -48,6 +52,17 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler
                return createJobAccumulatorsJson(graph);
        }
 
+       public static class JobAccumulatorsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobAccumulatorsJson(graph);
+                       String path = JOB_ACCUMULATORS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
        public static String createJobAccumulatorsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 7d72235..2b96456 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -26,6 +26,11 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+
+import java.util.Collection;
+import java.util.Collections;
 
 /**
  * Request handler that returns the execution config of a job.
@@ -48,6 +53,17 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
                return createJobConfigJson(graph);
        }
 
+       public static class JobConfigJsonArchivist implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobConfigJson(graph);
+                       String path = JOB_CONFIG_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
        public static String createJobConfigJson(AccessExecutionGraph graph) 
throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 6d1f82f..029a4b5 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -27,12 +27,16 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -67,6 +71,22 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                return createJobDetailsJson(graph, fetcher);
        }
 
+       public static class JobDetailsJsonArchivist implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobDetailsJson(graph, null);
+                       String path1 = JOB_DETAILS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       String path2 = JOB_DETAILS_VERTICES_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       Collection<ArchivedJson> archives = new ArrayList();
+                       archives.add(new ArchivedJson(path1, json));
+                       archives.add(new ArchivedJson(path2, json));
+                       return archives;
+               }
+       }
+
        public static String createJobDetailsJson(AccessExecutionGraph graph, 
@Nullable MetricFetcher fetcher) throws IOException {
                final StringWriter writer = new StringWriter();
                final JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 0cce61f..81cdc83 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -23,10 +23,14 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.util.ExceptionUtils;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -52,6 +56,17 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
                return createJobExceptionsJson(graph);
        }
 
+       public static class JobExceptionsJsonArchivist implements JsonArchivist 
{
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createJobExceptionsJson(graph);
+                       String path = JOB_EXCEPTIONS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
        public static String createJobExceptionsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
index becc2e1..885d04e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java
@@ -20,7 +20,12 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -43,4 +48,15 @@ public class JobPlanHandler extends 
AbstractExecutionGraphRequestHandler {
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
                return graph.getJsonPlan();
        }
+
+       public static class JobPlanJsonArchivist implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String path = JOB_PLAN_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       String json = graph.getJsonPlan();
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ca0488b..2532a1e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -21,11 +21,17 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 
@@ -47,6 +53,22 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
                return createVertexAccumulatorsJson(jobVertex);
        }
 
+       public static class JobVertexAccumulatorsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = 
createVertexAccumulatorsJson(task);
+                               String path = JOB_VERTEX_ACCUMULATORS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
        public static String 
createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 6e7e47c..d9a1131 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -21,16 +21,22 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -58,6 +64,22 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                return createVertexDetailsJson(jobVertex, params.get("jobid"), 
fetcher);
        }
 
+       public static class JobVertexDetailsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = createVertexDetailsJson(task, 
graph.getJobID().toString(), null);
+                               String path = JOB_VERTEX_DETAILS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
        public static String createVertexDetailsJson(
                        AccessExecutionJobVertex jobVertex,
                        String jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index 4fa54bd..3878722 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
@@ -32,6 +35,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -61,6 +65,22 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                return createVertexDetailsByTaskManagerJson(jobVertex, 
params.get("jobid"), fetcher);
        }
 
+       public static class JobVertexTaskManagersJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = 
createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null);
+                               String path = JOB_VERTEX_TASKMANAGERS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
        public static String createVertexDetailsByTaskManagerJson(
                        AccessExecutionJobVertex jobVertex,
                        String jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index a63016c..9026a22 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,10 +21,18 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -49,6 +57,38 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
                return createAttemptAccumulatorsJson(execAttempt);
        }
                
+       public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist 
implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
+                                       String curAttemptJson = 
createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt());
+                                       String curAttemptPath = 
SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":vertexid", 
task.getJobVertexId().toString())
+                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                               .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+                                       archive.add(new 
ArchivedJson(curAttemptPath, curAttemptJson));
+
+                                       for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+                                               AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
+                                               String json = 
createAttemptAccumulatorsJson(attempt);
+                                               String path = 
SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH
+                                                       .replace(":jobid", 
graph.getJobID().toString())
+                                                       .replace(":vertexid", 
task.getJobVertexId().toString())
+                                                       .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(":attempt", 
String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
+                               }
+                       }
+                       return archive;
+               }
+       }
+
        public static String createAttemptAccumulatorsJson(AccessExecution 
execAttempt) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index 5af6af9..078f54a 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -22,16 +22,26 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
 import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH;
+
 /**
  * Request handler providing details about a single task execution attempt.
  */
@@ -56,6 +66,43 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                return createAttemptDetailsJson(execAttempt, 
params.get("jobid"), params.get("vertexid"), fetcher);
        }
 
+       public static class SubtaskExecutionAttemptDetailsJsonArchivist 
implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
+                                       String curAttemptJson = 
createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), 
graph.getJobID().toString(), task.getJobVertexId().toString(), null);
+                                       String curAttemptPath1 = 
SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":vertexid", 
task.getJobVertexId().toString())
+                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()));
+                                       String curAttemptPath2 = 
SUBTASK_ATTEMPT_DETAILS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":vertexid", 
task.getJobVertexId().toString())
+                                               .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                               .replace(":attempt", 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+                                       
+                                       archive.add(new 
ArchivedJson(curAttemptPath1, curAttemptJson));
+                                       archive.add(new 
ArchivedJson(curAttemptPath2, curAttemptJson));
+
+                                       for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+                                               AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
+                                               String json = 
createAttemptDetailsJson(attempt, graph.getJobID().toString(), 
task.getJobVertexId().toString(), null);
+                                               String path = 
SUBTASK_ATTEMPT_DETAILS_REST_PATH
+                                                       .replace(":jobid", 
graph.getJobID().toString())
+                                                       .replace(":vertexid", 
task.getJobVertexId().toString())
+                                                       .replace(":subtasknum", 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                                       .replace(":attempt", 
String.valueOf(attempt.getAttemptNumber()));
+                                               archive.add(new 
ArchivedJson(path, json));
+                                       }
+                               }
+                       }
+                       return archive;
+               }
+       }
+
        public static String createAttemptDetailsJson(
                        AccessExecution execAttempt,
                        String jobID,

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 10a8773..6c3bc18 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -51,6 +57,22 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
                return createSubtasksAccumulatorsJson(jobVertex);
        }
 
+       public static class SubtasksAllAccumulatorsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = 
createSubtasksAccumulatorsJson(task);
+                               String path = 
SUBTASKS_ALL_ACCUMULATORS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
        public static String 
createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index 08bd722..adefa80 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -52,6 +58,22 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
                return createSubtaskTimesJson(jobVertex);
        }
 
+       public static class SubtasksTimesJsonArchivist implements JsonArchivist 
{
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                               String json = createSubtaskTimesJson(task);
+                               String path = SUBTASK_TIMES_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":vertexid", 
task.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
        public static String createSubtaskTimesJson(AccessExecutionJobVertex 
jobVertex) throws IOException {
                final long now = System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index 9976298..947b7c3 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -25,9 +25,13 @@ import 
org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -51,6 +55,17 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
                return createCheckpointConfigJson(graph);
        }
 
+       public static class CheckpointConfigJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createCheckpointConfigJson(graph);
+                       String path = CHECKPOINT_CONFIG_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
        private static String createCheckpointConfigJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index 4bbb8f6..16fd9bd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats;
 import org.apache.flink.runtime.checkpoint.FailedCheckpointStats;
@@ -28,9 +29,15 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -79,6 +86,27 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
                return createCheckpointDetailsJson(checkpoint);
        }
 
+       public static class CheckpointStatsDetailsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
+                       if (stats == null) {
+                               return Collections.emptyList();
+                       }
+                       CheckpointStatsHistory history = stats.getHistory();
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                               String json = 
createCheckpointDetailsJson(checkpoint);
+                               String path = CHECKPOINT_STATS_DETAILS_REST_PATH
+                                       .replace(":jobid", 
graph.getJobID().toString())
+                                       .replace(":checkpointid", 
String.valueOf(checkpoint.getCheckpointId()));
+                               archive.add(new ArchivedJson(path, json));
+                       }
+                       return archive;
+               }
+       }
+
        public static String 
createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws 
IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index b28ecef..bb39b2c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -20,6 +20,7 @@ package 
org.apache.flink.runtime.webmonitor.handlers.checkpoints;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
 import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
@@ -31,9 +32,15 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
 import static 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
@@ -104,6 +111,30 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
                return createSubtaskCheckpointDetailsJson(checkpoint, 
taskStats);
        }
 
+       public static class CheckpointStatsDetailsSubtasksJsonArchivist 
implements JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
+                       if (stats == null) {
+                               return Collections.emptyList();
+                       }
+                       CheckpointStatsHistory history = stats.getHistory();
+                       List<ArchivedJson> archive = new ArrayList<>();
+                       for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                               for (TaskStateStats subtaskStats : 
checkpoint.getAllTaskStateStats()) {
+                                       String json = 
createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats);
+                                       String path = 
CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH
+                                               .replace(":jobid", 
graph.getJobID().toString())
+                                               .replace(":checkpointid", 
String.valueOf(checkpoint.getCheckpointId()))
+                                               .replace(":vertexid", 
subtaskStats.getJobVertexId().toString());
+                                       archive.add(new ArchivedJson(path, 
json));
+                               }
+                       }
+                       return archive;
+               }
+       }
+
        private static String 
createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, 
TaskStateStats taskStats) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 585ab26..f004888 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -32,10 +32,14 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 
 /**
@@ -59,6 +63,17 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                return createCheckpointStatsJson(graph);
        }
 
+       public static class CheckpointStatsJsonArchivist implements 
JsonArchivist {
+
+               @Override
+               public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+                       String json = createCheckpointStatsJson(graph);
+                       String path = CHECKPOINT_STATS_REST_PATH
+                               .replace(":jobid", graph.getJobID().toString());
+                       return Collections.singletonList(new ArchivedJson(path, 
json));
+               }
+       }
+
        private static String createCheckpointStatsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index caf6d8e..097961e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -19,19 +19,47 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.core.JsonGenerator;
 import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Collection;
 import java.util.concurrent.TimeUnit;
 
 public class CurrentJobsOverviewHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               JobDetails expectedDetails = 
WebMonitorUtils.createDetailsForJob(originalJob);
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/joboverview", archive.getPath());
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(archive.getJson());
+               ArrayNode running = (ArrayNode) result.get("running");
+               Assert.assertEquals(0, running.size());
+
+               ArrayNode finished = (ArrayNode) result.get("finished");
+               Assert.assertEquals(1, finished.size());
+
+               compareJobOverview(expectedDetails, finished.get(0).toString());
+       }
+
        @Test
        public void testGetPaths() {
                CurrentJobsOverviewHandler handlerAll = new 
CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true);
@@ -58,8 +86,10 @@ public class CurrentJobsOverviewHandlerTest {
                try (JsonGenerator gen = 
ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) {
                        
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 
0);
                }
-               String answer = writer.toString();
+               compareJobOverview(expectedDetails, writer.toString());
+       }
 
+       private static void compareJobOverview(JobDetails expectedDetails, 
String answer) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(answer);
 
                Assert.assertEquals(expectedDetails.getJobId().toString(), 
result.get("jid").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 34748b7..f8ea792 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -20,11 +20,30 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobAccumulatorsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobAccumulatorsHandler.JobAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/accumulators", archive.getPath());
+               compareAccumulators(originalJob, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobAccumulatorsHandler handler = new 
JobAccumulatorsHandler(null);
@@ -38,6 +57,10 @@ public class JobAccumulatorsHandlerTest {
                AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
                String json = 
JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
 
+               compareAccumulators(originalJob, json);
+       }
+
+       private static void compareAccumulators(AccessExecutionGraph 
originalJob, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                ArrayNode accs = (ArrayNode) result.get("job-accumulators");

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index f304efe..f47b8ca 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -20,13 +20,31 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.flink.api.common.ArchivedExecutionConfig;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
 import java.util.Map;
 
 public class JobConfigHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobConfigHandler.JobConfigJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/config", archive.getPath());
+               compareJobConfig(originalJob, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobConfigHandler handler = new JobConfigHandler(null);
@@ -38,7 +56,10 @@ public class JobConfigHandlerTest {
        public void testJsonGeneration() throws Exception {
                AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
                String answer = 
JobConfigHandler.createJobConfigJson(originalJob);
+               compareJobConfig(originalJob, answer);
+       }
 
+       private static void compareJobConfig(AccessExecutionGraph originalJob, 
String answer) throws IOException {
                JsonNode job = 
ArchivedJobGenerationUtils.mapper.readTree(answer);
 
                Assert.assertEquals(originalJob.getJobID().toString(), 
job.get("jid").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index 3f80d12..0c4fb7e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -26,13 +26,37 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 
 public class JobDetailsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobDetailsHandler.JobDetailsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(2, archives.size());
+
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID(), 
archive1.getPath());
+               compareJobDetails(originalJob, archive1.getJson());
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices", archive2.getPath());
+               compareJobDetails(originalJob, archive2.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobDetailsHandler handler = new JobDetailsHandler(null, null);
@@ -48,6 +72,10 @@ public class JobDetailsHandlerTest {
                AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
                String json = 
JobDetailsHandler.createJobDetailsJson(originalJob, null);
 
+               compareJobDetails(originalJob, json);
+       }
+
+       private static void compareJobDetails(AccessExecutionGraph originalJob, 
String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalJob.getJobID().toString(), 
result.get("jid").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index c86ce6a..c51053a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -22,12 +22,31 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobExceptionsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobExceptionsHandler.JobExceptionsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/exceptions", archive.getPath());
+               compareExceptions(originalJob, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobExceptionsHandler handler = new JobExceptionsHandler(null);
@@ -41,6 +60,10 @@ public class JobExceptionsHandlerTest {
                AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
                String json = 
JobExceptionsHandler.createJobExceptionsJson(originalJob);
 
+               compareExceptions(originalJob, json);
+       }
+
+       private static void compareExceptions(AccessExecutionGraph originalJob, 
String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalJob.getFailureCauseAsString(), 
result.get("root-exception").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
index 42808ed..2ef5bb9 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java
@@ -17,10 +17,30 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collection;
+
 public class JobPlanHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobPlanHandler.JobPlanJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/plan", archive.getPath());
+               Assert.assertEquals(originalJob.getJsonPlan(), 
archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobPlanHandler handler = new JobPlanHandler(null);

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index 03c1896..8c88da8 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -20,12 +20,33 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobVertexAccumulatorsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/accumulators", 
archive.getPath());
+               compareAccumulators(originalTask, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobVertexAccumulatorsHandler handler = new 
JobVertexAccumulatorsHandler(null);
@@ -39,6 +60,10 @@ public class JobVertexAccumulatorsHandlerTest {
                AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
                String json = 
JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
 
+               compareAccumulators(originalTask, json);
+       }
+
+       private static void compareAccumulators(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index e909c8c..0fae8b5 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -20,14 +20,35 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobVertexDetailsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobVertexDetailsHandler.JobVertexDetailsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId(), archive.getPath());
+               compareVertexDetails(originalTask, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobVertexDetailsHandler handler = new 
JobVertexDetailsHandler(null, null);
@@ -42,6 +63,10 @@ public class JobVertexDetailsHandlerTest {
                String json = JobVertexDetailsHandler.createVertexDetailsJson(
                        originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
 
+               compareVertexDetails(originalTask, json);
+       }
+
+       private static void compareVertexDetails(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index 11e35e5..9271712 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -20,15 +20,37 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class JobVertexTaskManagersHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecutionVertex originalSubtask = 
ArchivedJobGenerationUtils.getTestSubtask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", 
archive.getPath());
+               compareVertexTaskManagers(originalTask, originalSubtask, 
archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                JobVertexTaskManagersHandler handler = new 
JobVertexTaskManagersHandler(null, null);
@@ -44,6 +66,10 @@ public class JobVertexTaskManagersHandlerTest {
                String json = 
JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
                        originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
 
+               compareVertexTaskManagers(originalTask, originalSubtask, json);
+       }
+
+       private static void compareVertexTaskManagers(AccessExecutionJobVertex 
originalTask, AccessExecutionVertex originalSubtask, String json) throws 
IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 8d24bd0..5993d5c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -20,11 +20,40 @@ package org.apache.flink.runtime.webmonitor.handlers;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals(
+                       "/jobs/" + originalJob.getJobID() +
+                       "/vertices/" + originalTask.getJobVertexId() +
+                       "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex() +
+                       "/attempts/" + originalAttempt.getAttemptNumber() + 
+                       "/accumulators",
+                       archive.getPath());
+               compareAttemptAccumulators(originalAttempt, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                SubtaskExecutionAttemptAccumulatorsHandler handler = new 
SubtaskExecutionAttemptAccumulatorsHandler(null);
@@ -38,6 +67,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
                AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
                String json = 
SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
 
+               compareAttemptAccumulators(originalAttempt, json);
+       }
+
+       private static void compareAttemptAccumulators(AccessExecution 
originalAttempt, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index 54f3f9c..f18858e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -22,11 +22,47 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+
 public class SubtaskExecutionAttemptDetailsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(2, archives.size());
+
+               Iterator<ArchivedJson> iterator = archives.iterator();
+               ArchivedJson archive1 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + originalJob.getJobID() +
+                               "/vertices/" + originalTask.getJobVertexId() +
+                               "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex(),
+                       archive1.getPath());
+               compareAttemptDetails(originalAttempt, archive1.getJson());
+
+               ArchivedJson archive2 = iterator.next();
+               Assert.assertEquals(
+                       "/jobs/" + originalJob.getJobID() +
+                               "/vertices/" + originalTask.getJobVertexId() +
+                               "/subtasks/" + 
originalAttempt.getParallelSubtaskIndex() +
+                               "/attempts/" + 
originalAttempt.getAttemptNumber(),
+                       archive2.getPath());
+               compareAttemptDetails(originalAttempt, archive2.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                SubtaskExecutionAttemptDetailsHandler handler = new 
SubtaskExecutionAttemptDetailsHandler(null, null);
@@ -43,6 +79,10 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
                String json = 
SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
                        originalAttempt, originalJob.getJobID().toString(), 
originalTask.getJobVertexId().toString(), null);
 
+               compareAttemptDetails(originalAttempt, json);
+       }
+       
+       private static void compareAttemptDetails(AccessExecution 
originalAttempt, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());

http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 954ebad..dfbe618 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -19,13 +19,35 @@ package org.apache.flink.runtime.webmonitor.handlers;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.util.Collection;
+
 public class SubtasksAllAccumulatorsHandlerTest {
+
+       @Test
+       public void testArchiver() throws Exception {
+               JsonArchivist archivist = new 
SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist();
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+
+               Collection<ArchivedJson> archives = 
archivist.archiveJsonWithPath(originalJob);
+               Assert.assertEquals(1, archives.size());
+
+               ArchivedJson archive = archives.iterator().next();
+               Assert.assertEquals("/jobs/" + originalJob.getJobID() + 
"/vertices/" + originalTask.getJobVertexId() + 
+                       "/subtasks/accumulators", archive.getPath());
+               compareSubtaskAccumulators(originalTask, archive.getJson());
+       }
+
        @Test
        public void testGetPaths() {
                SubtasksAllAccumulatorsHandler handler = new 
SubtasksAllAccumulatorsHandler(null);
@@ -38,7 +60,10 @@ public class SubtasksAllAccumulatorsHandlerTest {
        public void testJsonGeneration() throws Exception {
                AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
                String json = 
SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+               compareSubtaskAccumulators(originalTask, json);
+       }
 
+       private static void compareSubtaskAccumulators(AccessExecutionJobVertex 
originalTask, String json) throws IOException {
                JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
 
                Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());

Reply via email to