[FLINK-9194][history] Adjust handlers

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb06ba95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb06ba95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb06ba95

Branch: refs/heads/master
Commit: bb06ba954b8c65407039edd239da973d8e97e75b
Parents: 5753b74
Author: zentol <[email protected]>
Authored: Wed Apr 18 14:34:25 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Mon May 14 23:40:48 2018 +0200

----------------------------------------------------------------------
 .../handler/job/JobAccumulatorsHandler.java     | 22 ++++-
 .../rest/handler/job/JobConfigHandler.java      | 21 ++++-
 .../rest/handler/job/JobDetailsHandler.java     | 21 ++++-
 .../rest/handler/job/JobExceptionsHandler.java  | 21 ++++-
 .../rest/handler/job/JobPlanHandler.java        | 21 ++++-
 .../handler/job/JobVertexDetailsHandler.java    | 27 +++++-
 .../job/JobVertexTaskManagersHandler.java       | 29 ++++++-
 .../rest/handler/job/JobsOverviewHandler.java   | 19 ++++-
 ...taskExecutionAttemptAccumulatorsHandler.java | 50 ++++++++++-
 .../SubtaskExecutionAttemptDetailsHandler.java  | 59 ++++++++++++-
 .../rest/handler/job/SubtasksTimesHandler.java  | 27 +++++-
 .../checkpoints/CheckpointConfigHandler.java    | 28 ++++++-
 .../CheckpointStatisticDetailsHandler.java      | 33 +++++++-
 .../CheckpointingStatisticsHandler.java         | 26 +++++-
 .../TaskCheckpointStatisticDetailsHandler.java  | 87 ++++++++++++++------
 15 files changed, 447 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
index 964aee3..edb529c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java
@@ -28,13 +28,19 @@ import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValue
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.OptionalFailure;
 import org.apache.flink.util.SerializedValue;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -44,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler that returns the aggregated accumulators of a job.
  */
-public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> {
+public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAccumulatorsInfo, 
JobAccumulatorsMessageParameters> implements JsonArchivist {
 
        public JobAccumulatorsHandler(
                        CompletableFuture<String> localRestAddress,
@@ -66,7 +72,6 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAcc
 
        @Override
        protected JobAccumulatorsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, 
JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws 
RestHandlerException {
-               JobAccumulatorsInfo accumulatorsInfo;
                List<Boolean> queryParams = 
request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class);
 
                final boolean includeSerializedValue;
@@ -76,6 +81,18 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAcc
                        includeSerializedValue = false;
                }
 
+               return createJobAccumulatorsInfo(graph, includeSerializedValue);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json = createJobAccumulatorsInfo(graph, true);
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singleton(new ArchivedJson(path, json));
+       }
+
+       private static JobAccumulatorsInfo 
createJobAccumulatorsInfo(AccessExecutionGraph graph, boolean 
includeSerializedValue) {
                StringifiedAccumulatorResult[] stringifiedAccs = 
graph.getAccumulatorResultsStringified();
                List<JobAccumulatorsInfo.UserTaskAccumulator> 
userTaskAccumulators = new ArrayList<>(stringifiedAccs.length);
 
@@ -87,6 +104,7 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphHandler<JobAcc
                                        acc.getValue()));
                }
 
+               JobAccumulatorsInfo accumulatorsInfo;
                if (includeSerializedValue) {
                        Map<String, SerializedValue<OptionalFailure<Object>>> 
serializedUserTaskAccumulators = graph.getAccumulatorsSerialized();
                        accumulatorsInfo = new 
JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, 
serializedUserTaskAccumulators);

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index 7154246..3231668 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -25,11 +25,18 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobConfigInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -37,7 +44,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job configuration.
  */
-public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> {
+public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> implements 
JsonArchivist {
 
        public JobConfigHandler(
                        CompletableFuture<String> localRestAddress,
@@ -60,6 +67,18 @@ public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInf
 
        @Override
        protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, 
JobMessageParameters> request, AccessExecutionGraph executionGraph) {
+               return createJobConfigInfo(executionGraph);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json = createJobConfigInfo(graph);
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singleton(new ArchivedJson(path, json));
+       }
+
+       private static JobConfigInfo createJobConfigInfo(AccessExecutionGraph 
executionGraph) {
                final ArchivedExecutionConfig executionConfig = 
executionGraph.getArchivedExecutionConfig();
                final JobConfigInfo.ExecutionConfigInfo executionConfigInfo;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
index 82f24d3..d1383eb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java
@@ -32,16 +32,24 @@ import 
org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
 import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -50,7 +58,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler returning the details for the specified job.
  */
-public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> {
+public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements 
JsonArchivist {
 
        private final MetricFetcher<?> metricFetcher;
 
@@ -79,7 +87,18 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphHandler<JobDetailsI
        protected JobDetailsInfo handleRequest(
                        HandlerRequest<EmptyRequestBody, JobMessageParameters> 
request,
                        AccessExecutionGraph executionGraph) throws 
RestHandlerException {
+               return createJobDetailsInfo(executionGraph, metricFetcher);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json = createJobDetailsInfo(graph, null);
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singleton(new ArchivedJson(path, json));
+       }
 
+       private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph 
executionGraph, @Nullable MetricFetcher<?> metricFetcher) {
                final long now = System.currentTimeMillis();
                final long startTime = 
executionGraph.getStatusTimestamp(JobStatus.CREATED);
                final long endTime = 
executionGraph.getState().isGloballyTerminalState() ?

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index 63dc604..8ec1af0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -27,14 +27,21 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobExceptionsInfo;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.ExceptionUtils;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -43,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job exceptions.
  */
-public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> {
+public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> 
implements JsonArchivist {
 
        static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
 
@@ -68,6 +75,18 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExcep
 
        @Override
        protected JobExceptionsInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph executionGraph) {
+               return createJobExceptionsInfo(executionGraph);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json = createJobExceptionsInfo(graph);
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singletonList(new ArchivedJson(path, json));
+       }
+
+       private static JobExceptionsInfo 
createJobExceptionsInfo(AccessExecutionGraph executionGraph) {
                ErrorInfo rootException = executionGraph.getFailureInfo();
                String rootExceptionMessage = null;
                Long rootTimestamp = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index e7a30fb..6eb2573 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -23,12 +23,19 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobPlanInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -36,7 +43,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler serving the job execution plan.
  */
-public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, 
JobMessageParameters> {
+public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, 
JobMessageParameters> implements JsonArchivist {
 
        public JobPlanHandler(
                CompletableFuture<String> localRestAddress,
@@ -59,6 +66,18 @@ public class JobPlanHandler extends 
AbstractExecutionGraphHandler<JobPlanInfo, J
 
        @Override
        protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, 
JobMessageParameters> request, AccessExecutionGraph executionGraph) throws 
RestHandlerException {
+               return createJobPlanInfo(executionGraph);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json = createJobPlanInfo(graph);
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singleton(new ArchivedJson(path, json));
+       }
+
+       private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph 
executionGraph) {
                return new JobPlanInfo(executionGraph.getJsonPlan());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
index b4693a5..034f01d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java
@@ -36,12 +36,19 @@ import 
org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -50,7 +57,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the job vertex details.
  */
-public class JobVertexDetailsHandler extends 
AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> 
{
+public class JobVertexDetailsHandler extends 
AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> 
implements JsonArchivist {
        private final MetricFetcher<? extends RestfulGateway> metricFetcher;
 
        public JobVertexDetailsHandler(
@@ -85,6 +92,24 @@ public class JobVertexDetailsHandler extends 
AbstractExecutionGraphHandler<JobVe
                        throw new NotFoundException(String.format("JobVertex %s 
not found", jobVertexID));
                }
 
+               return createJobVertexDetailsInfo(jobVertex, jobID, 
metricFetcher);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               Collection<? extends AccessExecutionJobVertex> vertices = 
graph.getAllVertices().values();
+               List<ArchivedJson> archive = new ArrayList<>(vertices.size());
+               for (AccessExecutionJobVertex task : vertices) {
+                       ResponseBody json = createJobVertexDetailsInfo(task, 
graph.getJobID(), null);
+                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                               .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
+                               .replace(':' + JobVertexIdPathParameter.KEY, 
task.getJobVertexId().toString());
+                       archive.add(new ArchivedJson(path, json));
+               }
+               return archive;
+       }
+
+       private static JobVertexDetailsInfo 
createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, 
@Nullable MetricFetcher<?> metricFetcher) {
                List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new 
ArrayList<>();
                final long now = System.currentTimeMillis();
                int num = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 24650a3..efb6fc0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -38,13 +38,20 @@ import 
org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -55,7 +62,7 @@ import java.util.concurrent.Executor;
  * A request handler that provides the details of a job vertex, including id, 
name, and the
  * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
-public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, 
JobVertexMessageParameters> {
+public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, 
JobVertexMessageParameters> implements JsonArchivist {
        private MetricFetcher<?> metricFetcher;
 
        public JobVertexTaskManagersHandler(
@@ -83,6 +90,24 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                        throw new NotFoundException(String.format("JobVertex %s 
not found", jobVertexID));
                }
 
+               return createJobVertexTaskManagersInfo(jobVertex, jobID, 
metricFetcher);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               Collection<? extends AccessExecutionJobVertex> vertices = 
graph.getAllVertices().values();
+               List<ArchivedJson> archive = new ArrayList<>(vertices.size());
+               for (AccessExecutionJobVertex task : vertices) {
+                       ResponseBody json = 
createJobVertexTaskManagersInfo(task, graph.getJobID(), null);
+                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                               .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
+                               .replace(':' + JobVertexIdPathParameter.KEY, 
task.getJobVertexId().toString());
+                       archive.add(new ArchivedJson(path, json));
+               }
+               return archive;
+       }
+
+       private static JobVertexTaskManagersInfo 
createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID 
jobID, @Nullable MetricFetcher<?> metricFetcher) {
                // Build a map that groups tasks by TaskManager
                Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
                for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
@@ -173,6 +198,6 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                                statusCounts));
                }
 
-               return new JobVertexTaskManagersInfo(jobVertexID, 
jobVertex.getName(), now, taskManagersInfoList);
+               return new 
JobVertexTaskManagersInfo(jobVertex.getJobVertexId(), jobVertex.getName(), now, 
taskManagersInfoList);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
index 94bdbd2..6d2b1e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java
@@ -19,25 +19,34 @@
 package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import javax.annotation.Nonnull;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
  * Overview handler for jobs.
  */
-public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, 
EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> {
+public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, 
EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> implements 
JsonArchivist {
 
        public JobsOverviewHandler(
                        CompletableFuture<String> localRestAddress,
@@ -57,4 +66,12 @@ public class JobsOverviewHandler extends 
AbstractRestHandler<RestfulGateway, Emp
        protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull 
HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull 
RestfulGateway gateway) throws RestHandlerException {
                return gateway.requestMultipleJobDetails(timeout);
        }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json = new 
MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph)));
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singletonList(new ArchivedJson(path, json));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
index e3b1719..e335238 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -21,18 +21,31 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo;
 import org.apache.flink.runtime.rest.messages.job.UserAccumulator;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -40,7 +53,10 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtask execution attempt accumulators.
  */
-public class SubtaskExecutionAttemptAccumulatorsHandler extends 
AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, 
SubtaskAttemptMessageParameters> {
+public class SubtaskExecutionAttemptAccumulatorsHandler
+       extends 
AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, 
SubtaskAttemptMessageParameters>
+       implements JsonArchivist {
+
        /**
         * Instantiates a new Abstract job vertex handler.
         *
@@ -68,7 +84,39 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
        protected SubtaskExecutionAttemptAccumulatorsInfo handleRequest(
                        HandlerRequest<EmptyRequestBody, 
SubtaskAttemptMessageParameters> request,
                        AccessExecution execution) throws RestHandlerException {
+               return createAccumulatorInfo(execution);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               List<ArchivedJson> archive = new ArrayList<>(16);
+               for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                       for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
+                               ResponseBody curAttemptJson = 
createAccumulatorInfo(subtask.getCurrentExecutionAttempt());
+                               String curAttemptPath = 
getMessageHeaders().getTargetRestEndpointURL()
+                                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
+                                       .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                       .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                       .replace(':' + 
SubtaskAttemptPathParameter.KEY, 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+                               archive.add(new ArchivedJson(curAttemptPath, 
curAttemptJson));
+
+                               for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+                                       AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
+                                       ResponseBody json = 
createAccumulatorInfo(attempt);
+                                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                               .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
+                                               .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                               .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                               .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+                                       archive.add(new ArchivedJson(path, 
json));
+                               }
+                       }
+               }
+               return archive;
+       }
 
+       private static SubtaskExecutionAttemptAccumulatorsInfo 
createAccumulatorInfo(AccessExecution execution) {
                final StringifiedAccumulatorResult[] accs = 
execution.getUserAccumulatorsStringified();
                final ArrayList<UserAccumulator> userAccumulatorList = new 
ArrayList<>(accs.length);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
index b781ee7..bae80c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java
@@ -21,6 +21,9 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -31,12 +34,23 @@ import 
org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters;
+import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter;
 import 
org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -44,7 +58,9 @@ import java.util.concurrent.Executor;
 /**
  * Handler of specific sub task execution attempt.
  */
-public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, 
SubtaskAttemptMessageParameters> {
+public class SubtaskExecutionAttemptDetailsHandler
+       extends 
AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, 
SubtaskAttemptMessageParameters>
+       implements JsonArchivist {
 
        private final MetricFetcher<?> metricFetcher;
 
@@ -79,11 +95,48 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                        HandlerRequest<EmptyRequestBody, 
SubtaskAttemptMessageParameters> request,
                        AccessExecution execution) throws RestHandlerException {
 
-               final MutableIOMetrics ioMetrics = new MutableIOMetrics();
-
                final JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
                final JobVertexID jobVertexID = 
request.getPathParameter(JobVertexIdPathParameter.class);
 
+               return createDetailsInfo(execution, jobID, jobVertexID, 
metricFetcher);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               List<ArchivedJson> archive = new ArrayList<>(16);
+               for (AccessExecutionJobVertex task : 
graph.getAllVertices().values()) {
+                       for (AccessExecutionVertex subtask : 
task.getTaskVertices()) {
+                               ResponseBody curAttemptJson = 
createDetailsInfo(subtask.getCurrentExecutionAttempt(), graph.getJobID(), 
task.getJobVertexId(), null);
+                               String curAttemptPath = 
getMessageHeaders().getTargetRestEndpointURL()
+                                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
+                                       .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                       .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                       .replace(':' + 
SubtaskAttemptPathParameter.KEY, 
String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber()));
+
+                               archive.add(new ArchivedJson(curAttemptPath, 
curAttemptJson));
+
+                               for (int x = 0; x < 
subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) {
+                                       AccessExecution attempt = 
subtask.getPriorExecutionAttempt(x);
+                                       ResponseBody json = 
createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null);
+                                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                               .replace(':' + 
JobIDPathParameter.KEY, graph.getJobID().toString())
+                                               .replace(':' + 
JobVertexIdPathParameter.KEY, task.getJobVertexId().toString())
+                                               .replace(':' + 
SubtaskIndexPathParameter.KEY, 
String.valueOf(subtask.getParallelSubtaskIndex()))
+                                               .replace(':' + 
SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber()));
+                                       archive.add(new ArchivedJson(path, 
json));
+                               }
+                       }
+               }
+               return archive;
+       }
+
+       private static SubtaskExecutionAttemptDetailsInfo createDetailsInfo(
+                       AccessExecution execution,
+                       JobID jobID,
+                       JobVertexID jobVertexID,
+                       @Nullable MetricFetcher<?> metricFetcher) {
+               final MutableIOMetrics ioMetrics = new MutableIOMetrics();
+
                ioMetrics.addIOMetrics(
                        execution,
                        metricFetcher,

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index 29c0f93..5026951 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -20,19 +20,27 @@ package org.apache.flink.runtime.rest.handler.job;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -42,7 +50,7 @@ import java.util.concurrent.Executor;
 /**
  * Request handler for the subtasks times info.
  */
-public class SubtasksTimesHandler extends 
AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters>  {
+public class SubtasksTimesHandler extends 
AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters> 
implements JsonArchivist {
        public SubtasksTimesHandler(
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
@@ -63,7 +71,24 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexHandler<SubtasksTimes
 
        @Override
        protected SubtasksTimesInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> 
request, AccessExecutionJobVertex jobVertex) {
+               return createSubtaskTimesInfo(jobVertex);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               Collection<? extends AccessExecutionJobVertex> allVertices = 
graph.getAllVertices().values();
+               List<ArchivedJson> archive = new 
ArrayList<>(allVertices.size());
+               for (AccessExecutionJobVertex task : allVertices) {
+                       ResponseBody json = createSubtaskTimesInfo(task);
+                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                               .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
+                               .replace(':' + JobVertexIdPathParameter.KEY, 
task.getJobVertexId().toString());
+                       archive.add(new ArchivedJson(path, json));
+               }
+               return archive;
+       }
 
+       private static SubtasksTimesInfo 
createSubtaskTimesInfo(AccessExecutionJobVertex jobVertex) {
                final String id = jobVertex.getJobVertexId().toString();
                final String name = jobVertex.getName();
                final long now = System.currentTimeMillis();

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index b88183e..3a03575 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -27,14 +27,23 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -42,7 +51,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint configuration.
  */
-public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> {
+public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> 
implements JsonArchivist {
 
        public CheckpointConfigHandler(
                        CompletableFuture<String> localRestAddress,
@@ -64,6 +73,23 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<Check
 
        @Override
        protected CheckpointConfigInfo 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph executionGraph) throws RestHandlerException {
+               return createCheckpointConfigInfo(executionGraph);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody response;
+               try {
+                       response = createCheckpointConfigInfo(graph);
+               } catch (RestHandlerException rhe) {
+                       response = new ErrorResponseBody(rhe.getMessage());
+               }
+               String path = 
CheckpointConfigHeaders.getInstance().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singletonList(new ArchivedJson(path, 
response));
+       }
+
+       private static CheckpointConfigInfo 
createCheckpointConfigInfo(AccessExecutionGraph executionGraph) throws 
RestHandlerException {
                final CheckpointCoordinatorConfiguration 
checkpointCoordinatorConfiguration = 
executionGraph.getCheckpointCoordinatorConfiguration();
 
                if (checkpointCoordinatorConfiguration == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
index 2816336..8f2d713 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -20,15 +20,28 @@ package 
org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -36,7 +49,7 @@ import java.util.concurrent.Executor;
 /**
  * REST handler which returns the details for a checkpoint.
  */
-public class CheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> {
+public class CheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> 
implements JsonArchivist {
 
        public CheckpointStatisticDetailsHandler(
                        CompletableFuture<String> localRestAddress,
@@ -62,4 +75,22 @@ public class CheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler
        protected CheckpointStatistics 
handleCheckpointRequest(HandlerRequest<EmptyRequestBody, 
CheckpointMessageParameters> ignored, AbstractCheckpointStats checkpointStats) {
                return 
CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true);
        }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
+               if (stats == null) {
+                       return Collections.emptyList();
+               }
+               CheckpointStatsHistory history = stats.getHistory();
+               List<ArchivedJson> archive = new 
ArrayList<>(history.getCheckpoints().size());
+               for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                       ResponseBody json = 
CheckpointStatistics.generateCheckpointStatistics(checkpoint, true);
+                       String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                               .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString())
+                               .replace(':' + CheckpointIdPathParameter.KEY, 
String.valueOf(checkpoint.getCheckpointId()));
+                       archive.add(new ArchivedJson(path, json));
+               }
+               return archive;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index b9db367..d4345ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -32,17 +32,25 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
 import org.apache.flink.runtime.rest.messages.JobMessageParameters;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics;
 import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -51,7 +59,7 @@ import java.util.concurrent.Executor;
 /**
  * Handler which serves the checkpoint statistics.
  */
-public class CheckpointingStatisticsHandler extends 
AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> {
+public class CheckpointingStatisticsHandler extends 
AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> 
implements JsonArchivist {
 
        public CheckpointingStatisticsHandler(
                        CompletableFuture<String> localRestAddress,
@@ -66,7 +74,23 @@ public class CheckpointingStatisticsHandler extends 
AbstractExecutionGraphHandle
 
        @Override
        protected CheckpointingStatistics 
handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, 
AccessExecutionGraph executionGraph) throws RestHandlerException {
+               return createCheckpointingStatistics(executionGraph);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               ResponseBody json;
+               try {
+                       json = createCheckpointingStatistics(graph);
+               } catch (RestHandlerException rhe) {
+                       json = new ErrorResponseBody(rhe.getMessage());
+               }
+               String path = getMessageHeaders().getTargetRestEndpointURL()
+                       .replace(':' + JobIDPathParameter.KEY, 
graph.getJobID().toString());
+               return Collections.singletonList(new ArchivedJson(path, json));
+       }
 
+       private static CheckpointingStatistics 
createCheckpointingStatistics(AccessExecutionGraph executionGraph) throws 
RestHandlerException {
                final CheckpointStatsSnapshot checkpointStatsSnapshot = 
executionGraph.getCheckpointStatsSnapshot();
 
                if (checkpointStatsSnapshot == null) {

http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
index cff3bf0..2084c50 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
@@ -20,26 +20,35 @@ package 
org.apache.flink.runtime.rest.handler.job.checkpoints;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory;
+import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot;
 import org.apache.flink.runtime.checkpoint.MinMaxAvgStats;
 import org.apache.flink.runtime.checkpoint.SubtaskStateStats;
 import org.apache.flink.runtime.checkpoint.TaskStateStats;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
 import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.ResponseBody;
+import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter;
 import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
+import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
-import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
-
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
@@ -48,7 +57,9 @@ import java.util.concurrent.Executor;
 /**
  * REST handler which serves checkpoint statistics for subtasks.
  */
-public class TaskCheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, 
TaskCheckpointMessageParameters> {
+public class TaskCheckpointStatisticDetailsHandler
+       extends 
AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, 
TaskCheckpointMessageParameters>
+       implements JsonArchivist {
 
        public TaskCheckpointStatisticDetailsHandler(
                        CompletableFuture<String> localRestAddress,
@@ -79,30 +90,54 @@ public class TaskCheckpointStatisticDetailsHandler extends 
AbstractCheckpointHan
 
                final TaskStateStats taskStatistics = 
checkpointStats.getTaskStateStats(jobVertexId);
 
-               if (taskStatistics != null) {
-
-                       final 
TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary(
-                               taskStatistics.getSummaryStats(),
-                               checkpointStats.getTriggerTimestamp());
-
-                       final List<SubtaskCheckpointStatistics> 
subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
-                               taskStatistics.getSubtaskStats(),
-                               checkpointStats.getTriggerTimestamp());
-
-                       return new TaskCheckpointStatisticsWithSubtaskDetails(
-                               checkpointStats.getCheckpointId(),
-                               checkpointStats.getStatus(),
-                               taskStatistics.getLatestAckTimestamp(),
-                               taskStatistics.getStateSize(),
-                               
taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
-                               taskStatistics.getAlignmentBuffered(),
-                               taskStatistics.getNumberOfSubtasks(),
-                               
taskStatistics.getNumberOfAcknowledgedSubtasks(),
-                               summary,
-                               subtaskCheckpointStatistics);
-               } else {
-                       throw new RestHandlerException("There is no checkpoint 
statistics for task " + jobVertexId + '.', HttpResponseStatus.NOT_FOUND);
+               if (taskStatistics == null) {
+                       throw new NotFoundException("There is no checkpoint 
statistics for task " + jobVertexId + '.');
+               }
+
+               return createCheckpointDetails(checkpointStats, taskStatistics);
+       }
+
+       @Override
+       public Collection<ArchivedJson> 
archiveJsonWithPath(AccessExecutionGraph graph) throws IOException {
+               CheckpointStatsSnapshot stats = 
graph.getCheckpointStatsSnapshot();
+               if (stats == null) {
+                       return Collections.emptyList();
+               }
+               CheckpointStatsHistory history = stats.getHistory();
+               List<ArchivedJson> archive = new 
ArrayList<>(history.getCheckpoints().size());
+               for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
+                       for (TaskStateStats subtaskStats : 
checkpoint.getAllTaskStateStats()) {
+                               ResponseBody json = 
createCheckpointDetails(checkpoint, subtaskStats);
+                               String path = 
getMessageHeaders().getTargetRestEndpointURL()
+                                       .replace(':' + 
JobVertexIdPathParameter.KEY, graph.getJobID().toString())
+                                       .replace(':' + 
CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId()))
+                                       .replace(':' + 
JobVertexIdPathParameter.KEY, subtaskStats.getJobVertexId().toString());
+                               archive.add(new ArchivedJson(path, json));
+                       }
                }
+               return archive;
+       }
+
+       private static TaskCheckpointStatisticsWithSubtaskDetails 
createCheckpointDetails(AbstractCheckpointStats checkpointStats, TaskStateStats 
taskStatistics) {
+               final TaskCheckpointStatisticsWithSubtaskDetails.Summary 
summary = createSummary(
+                       taskStatistics.getSummaryStats(),
+                       checkpointStats.getTriggerTimestamp());
+
+               final List<SubtaskCheckpointStatistics> 
subtaskCheckpointStatistics = createSubtaskCheckpointStatistics(
+                       taskStatistics.getSubtaskStats(),
+                       checkpointStats.getTriggerTimestamp());
+
+               return new TaskCheckpointStatisticsWithSubtaskDetails(
+                       checkpointStats.getCheckpointId(),
+                       checkpointStats.getStatus(),
+                       taskStatistics.getLatestAckTimestamp(),
+                       taskStatistics.getStateSize(),
+                       
taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()),
+                       taskStatistics.getAlignmentBuffered(),
+                       taskStatistics.getNumberOfSubtasks(),
+                       taskStatistics.getNumberOfAcknowledgedSubtasks(),
+                       summary,
+                       subtaskCheckpointStatistics);
        }
 
        private static TaskCheckpointStatisticsWithSubtaskDetails.Summary 
createSummary(TaskStateStats.TaskStateStatsSummary taskStatisticsSummary, long 
triggerTimestamp) {

Reply via email to