[FLINK-9194][history] Adjust handlers
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bb06ba95 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bb06ba95 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bb06ba95 Branch: refs/heads/master Commit: bb06ba954b8c65407039edd239da973d8e97e75b Parents: 5753b74 Author: zentol <[email protected]> Authored: Wed Apr 18 14:34:25 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Mon May 14 23:40:48 2018 +0200 ---------------------------------------------------------------------- .../handler/job/JobAccumulatorsHandler.java | 22 ++++- .../rest/handler/job/JobConfigHandler.java | 21 ++++- .../rest/handler/job/JobDetailsHandler.java | 21 ++++- .../rest/handler/job/JobExceptionsHandler.java | 21 ++++- .../rest/handler/job/JobPlanHandler.java | 21 ++++- .../handler/job/JobVertexDetailsHandler.java | 27 +++++- .../job/JobVertexTaskManagersHandler.java | 29 ++++++- .../rest/handler/job/JobsOverviewHandler.java | 19 ++++- ...taskExecutionAttemptAccumulatorsHandler.java | 50 ++++++++++- .../SubtaskExecutionAttemptDetailsHandler.java | 59 ++++++++++++- .../rest/handler/job/SubtasksTimesHandler.java | 27 +++++- .../checkpoints/CheckpointConfigHandler.java | 28 ++++++- .../CheckpointStatisticDetailsHandler.java | 33 +++++++- .../CheckpointingStatisticsHandler.java | 26 +++++- .../TaskCheckpointStatisticDetailsHandler.java | 87 ++++++++++++++------ 15 files changed, 447 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java index 964aee3..edb529c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobAccumulatorsHandler.java @@ -28,13 +28,19 @@ import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValue import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobAccumulatorsInfo; import org.apache.flink.runtime.rest.messages.JobAccumulatorsMessageParameters; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.OptionalFailure; import org.apache.flink.util.SerializedValue; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; @@ -44,7 +50,7 @@ import java.util.concurrent.Executor; /** * Request handler that returns the aggregated accumulators of a job. */ -public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> { +public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAccumulatorsInfo, JobAccumulatorsMessageParameters> implements JsonArchivist { public JobAccumulatorsHandler( CompletableFuture<String> localRestAddress, @@ -66,7 +72,6 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc @Override protected JobAccumulatorsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobAccumulatorsMessageParameters> request, AccessExecutionGraph graph) throws RestHandlerException { - JobAccumulatorsInfo accumulatorsInfo; List<Boolean> queryParams = request.getQueryParameter(AccumulatorsIncludeSerializedValueQueryParameter.class); final boolean includeSerializedValue; @@ -76,6 +81,18 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc includeSerializedValue = false; } + return createJobAccumulatorsInfo(graph, includeSerializedValue); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json = createJobAccumulatorsInfo(graph, true); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singleton(new ArchivedJson(path, json)); + } + + private static JobAccumulatorsInfo createJobAccumulatorsInfo(AccessExecutionGraph graph, boolean includeSerializedValue) { StringifiedAccumulatorResult[] stringifiedAccs = graph.getAccumulatorResultsStringified(); List<JobAccumulatorsInfo.UserTaskAccumulator> userTaskAccumulators = new ArrayList<>(stringifiedAccs.length); @@ -87,6 +104,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphHandler<JobAcc acc.getValue())); } + JobAccumulatorsInfo accumulatorsInfo; if (includeSerializedValue) { Map<String, SerializedValue<OptionalFailure<Object>>> serializedUserTaskAccumulators = graph.getAccumulatorsSerialized(); accumulatorsInfo = new JobAccumulatorsInfo(Collections.emptyList(), userTaskAccumulators, serializedUserTaskAccumulators); http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java index 7154246..3231668 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java @@ -25,11 +25,18 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobConfigInfo; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -37,7 +44,7 @@ import java.util.concurrent.Executor; /** * Handler serving the job configuration. */ -public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> { +public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInfo, JobMessageParameters> implements JsonArchivist { public JobConfigHandler( CompletableFuture<String> localRestAddress, @@ -60,6 +67,18 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf @Override protected JobConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) { + return createJobConfigInfo(executionGraph); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json = createJobConfigInfo(graph); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singleton(new ArchivedJson(path, json)); + } + + private static JobConfigInfo createJobConfigInfo(AccessExecutionGraph executionGraph) { final ArchivedExecutionConfig executionConfig = executionGraph.getArchivedExecutionConfig(); final JobConfigInfo.ExecutionConfigInfo executionConfigInfo; http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java index 82f24d3..d1383eb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobDetailsHandler.java @@ -32,16 +32,24 @@ import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -50,7 +58,7 @@ import java.util.concurrent.Executor; /** * Handler returning the details for the specified job. */ -public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> { +public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsInfo, JobMessageParameters> implements JsonArchivist { private final MetricFetcher<?> metricFetcher; @@ -79,7 +87,18 @@ public class JobDetailsHandler extends AbstractExecutionGraphHandler<JobDetailsI protected JobDetailsInfo handleRequest( HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + return createJobDetailsInfo(executionGraph, metricFetcher); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json = createJobDetailsInfo(graph, null); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singleton(new ArchivedJson(path, json)); + } + private static JobDetailsInfo createJobDetailsInfo(AccessExecutionGraph executionGraph, @Nullable MetricFetcher<?> metricFetcher) { final long now = System.currentTimeMillis(); final long startTime = executionGraph.getStatusTimestamp(JobStatus.CREATED); final long endTime = executionGraph.getState().isGloballyTerminalState() ? http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index 63dc604..8ec1af0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -27,14 +27,21 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobExceptionsInfo; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.ExceptionUtils; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -43,7 +50,7 @@ import java.util.concurrent.Executor; /** * Handler serving the job exceptions. */ -public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> { +public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExceptionsInfo, JobMessageParameters> implements JsonArchivist { static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; @@ -68,6 +75,18 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep @Override protected JobExceptionsInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) { + return createJobExceptionsInfo(executionGraph); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json = createJobExceptionsInfo(graph); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + + private static JobExceptionsInfo createJobExceptionsInfo(AccessExecutionGraph executionGraph) { ErrorInfo rootException = executionGraph.getFailureInfo(); String rootExceptionMessage = null; Long rootTimestamp = null; http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java index e7a30fb..6eb2573 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java @@ -23,12 +23,19 @@ import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.JobPlanInfo; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -36,7 +43,7 @@ import java.util.concurrent.Executor; /** * Handler serving the job execution plan. */ -public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> { +public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, JobMessageParameters> implements JsonArchivist { public JobPlanHandler( CompletableFuture<String> localRestAddress, @@ -59,6 +66,18 @@ public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, J @Override protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + return createJobPlanInfo(executionGraph); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json = createJobPlanInfo(graph); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singleton(new ArchivedJson(path, json)); + } + + private static JobPlanInfo createJobPlanInfo(AccessExecutionGraph executionGraph) { return new JobPlanInfo(executionGraph.getJsonPlan()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java index b4693a5..034f01d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexDetailsHandler.java @@ -36,12 +36,19 @@ import org.apache.flink.runtime.rest.messages.JobVertexDetailsInfo; import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -50,7 +57,7 @@ import java.util.concurrent.Executor; /** * Request handler for the job vertex details. */ -public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> { +public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVertexDetailsInfo, JobVertexMessageParameters> implements JsonArchivist { private final MetricFetcher<? extends RestfulGateway> metricFetcher; public JobVertexDetailsHandler( @@ -85,6 +92,24 @@ public class JobVertexDetailsHandler extends AbstractExecutionGraphHandler<JobVe throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID)); } + return createJobVertexDetailsInfo(jobVertex, jobID, metricFetcher); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + Collection<? extends AccessExecutionJobVertex> vertices = graph.getAllVertices().values(); + List<ArchivedJson> archive = new ArrayList<>(vertices.size()); + for (AccessExecutionJobVertex task : vertices) { + ResponseBody json = createJobVertexDetailsInfo(task, graph.getJobID(), null); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + + private static JobVertexDetailsInfo createJobVertexDetailsInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) { List<JobVertexDetailsInfo.VertexTaskDetail> subtasks = new ArrayList<>(); final long now = System.currentTimeMillis(); int num = 0; http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java index 24650a3..efb6fc0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java @@ -38,13 +38,20 @@ import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -55,7 +62,7 @@ import java.util.concurrent.Executor; * A request handler that provides the details of a job vertex, including id, name, and the * runtime and metrics of all its subtasks aggregated by TaskManager. */ -public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> { +public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, JobVertexMessageParameters> implements JsonArchivist { private MetricFetcher<?> metricFetcher; public JobVertexTaskManagersHandler( @@ -83,6 +90,24 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< throw new NotFoundException(String.format("JobVertex %s not found", jobVertexID)); } + return createJobVertexTaskManagersInfo(jobVertex, jobID, metricFetcher); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + Collection<? extends AccessExecutionJobVertex> vertices = graph.getAllVertices().values(); + List<ArchivedJson> archive = new ArrayList<>(vertices.size()); + for (AccessExecutionJobVertex task : vertices) { + ResponseBody json = createJobVertexTaskManagersInfo(task, graph.getJobID(), null); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + + private static JobVertexTaskManagersInfo createJobVertexTaskManagersInfo(AccessExecutionJobVertex jobVertex, JobID jobID, @Nullable MetricFetcher<?> metricFetcher) { // Build a map that groups tasks by TaskManager Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); for (AccessExecutionVertex vertex : jobVertex.getTaskVertices()) { @@ -173,6 +198,6 @@ public class JobVertexTaskManagersHandler extends AbstractExecutionGraphHandler< statusCounts)); } - return new JobVertexTaskManagersInfo(jobVertexID, jobVertex.getName(), now, taskManagersInfoList); + return new JobVertexTaskManagersInfo(jobVertex.getJobVertexId(), jobVertex.getName(), now, taskManagersInfoList); } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java index 94bdbd2..6d2b1e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobsOverviewHandler.java @@ -19,25 +19,34 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.rest.handler.AbstractRestHandler; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import javax.annotation.Nonnull; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; /** * Overview handler for jobs. */ -public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> { +public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, MultipleJobsDetails, EmptyMessageParameters> implements JsonArchivist { public JobsOverviewHandler( CompletableFuture<String> localRestAddress, @@ -57,4 +66,12 @@ public class JobsOverviewHandler extends AbstractRestHandler<RestfulGateway, Emp protected CompletableFuture<MultipleJobsDetails> handleRequest(@Nonnull HandlerRequest<EmptyRequestBody, EmptyMessageParameters> request, @Nonnull RestfulGateway gateway) throws RestHandlerException { return gateway.requestMultipleJobDetails(timeout); } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json = new MultipleJobsDetails(Collections.singleton(WebMonitorUtils.createDetailsForJob(graph))); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java index e3b1719..e335238 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -21,18 +21,31 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptAccumulatorsInfo; import org.apache.flink.runtime.rest.messages.job.UserAccumulator; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -40,7 +53,10 @@ import java.util.concurrent.Executor; /** * Request handler for the subtask execution attempt accumulators. */ -public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> { +public class SubtaskExecutionAttemptAccumulatorsHandler + extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptAccumulatorsInfo, SubtaskAttemptMessageParameters> + implements JsonArchivist { + /** * Instantiates a new Abstract job vertex handler. * @@ -68,7 +84,39 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA protected SubtaskExecutionAttemptAccumulatorsInfo handleRequest( HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request, AccessExecution execution) throws RestHandlerException { + return createAccumulatorInfo(execution); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(16); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + for (AccessExecutionVertex subtask : task.getTaskVertices()) { + ResponseBody curAttemptJson = createAccumulatorInfo(subtask.getCurrentExecutionAttempt()); + String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) + .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); + + archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + + for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { + AccessExecution attempt = subtask.getPriorExecutionAttempt(x); + ResponseBody json = createAccumulatorInfo(attempt); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) + .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } + } + } + return archive; + } + private static SubtaskExecutionAttemptAccumulatorsInfo createAccumulatorInfo(AccessExecution execution) { final StringifiedAccumulatorResult[] accs = execution.getUserAccumulatorsStringified(); final ArrayList<UserAccumulator> userAccumulatorList = new ArrayList<>(accs.length); http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java index b781ee7..bae80c7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandler.java @@ -21,6 +21,9 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; @@ -31,12 +34,23 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.SubtaskIndexPathParameter; import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptMessageParameters; +import org.apache.flink.runtime.rest.messages.job.SubtaskAttemptPathParameter; import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetailsInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.util.Preconditions; +import javax.annotation.Nullable; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -44,7 +58,9 @@ import java.util.concurrent.Executor; /** * Handler of specific sub task execution attempt. */ -public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> { +public class SubtaskExecutionAttemptDetailsHandler + extends AbstractSubtaskAttemptHandler<SubtaskExecutionAttemptDetailsInfo, SubtaskAttemptMessageParameters> + implements JsonArchivist { private final MetricFetcher<?> metricFetcher; @@ -79,11 +95,48 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp HandlerRequest<EmptyRequestBody, SubtaskAttemptMessageParameters> request, AccessExecution execution) throws RestHandlerException { - final MutableIOMetrics ioMetrics = new MutableIOMetrics(); - final JobID jobID = request.getPathParameter(JobIDPathParameter.class); final JobVertexID jobVertexID = request.getPathParameter(JobVertexIdPathParameter.class); + return createDetailsInfo(execution, jobID, jobVertexID, metricFetcher); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(16); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + for (AccessExecutionVertex subtask : task.getTaskVertices()) { + ResponseBody curAttemptJson = createDetailsInfo(subtask.getCurrentExecutionAttempt(), graph.getJobID(), task.getJobVertexId(), null); + String curAttemptPath = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) + .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); + + archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + + for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { + AccessExecution attempt = subtask.getPriorExecutionAttempt(x); + ResponseBody json = createDetailsInfo(attempt, graph.getJobID(), task.getJobVertexId(), null); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()) + .replace(':' + SubtaskIndexPathParameter.KEY, String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(':' + SubtaskAttemptPathParameter.KEY, String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } + } + } + return archive; + } + + private static SubtaskExecutionAttemptDetailsInfo createDetailsInfo( + AccessExecution execution, + JobID jobID, + JobVertexID jobVertexID, + @Nullable MetricFetcher<?> metricFetcher) { + final MutableIOMetrics ioMetrics = new MutableIOMetrics(); + ioMetrics.addIOMetrics( execution, metricFetcher, http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java index 29c0f93..5026951 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java @@ -20,19 +20,27 @@ package org.apache.flink.runtime.rest.handler.job; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; +import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.SubtasksTimesInfo; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -42,7 +50,7 @@ import java.util.concurrent.Executor; /** * Request handler for the subtasks times info. */ -public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters> { +public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimesInfo, JobVertexMessageParameters> implements JsonArchivist { public SubtasksTimesHandler( CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, @@ -63,7 +71,24 @@ public class SubtasksTimesHandler extends AbstractJobVertexHandler<SubtasksTimes @Override protected SubtasksTimesInfo handleRequest(HandlerRequest<EmptyRequestBody, JobVertexMessageParameters> request, AccessExecutionJobVertex jobVertex) { + return createSubtaskTimesInfo(jobVertex); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + Collection<? extends AccessExecutionJobVertex> allVertices = graph.getAllVertices().values(); + List<ArchivedJson> archive = new ArrayList<>(allVertices.size()); + for (AccessExecutionJobVertex task : allVertices) { + ResponseBody json = createSubtaskTimesInfo(task); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + JobVertexIdPathParameter.KEY, task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + private static SubtasksTimesInfo createSubtaskTimesInfo(AccessExecutionJobVertex jobVertex) { final String id = jobVertex.getJobVertexId().toString(); final String name = jobVertex.getName(); final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java index b88183e..3a03575 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java @@ -27,14 +27,23 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigInfo; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -42,7 +51,7 @@ import java.util.concurrent.Executor; /** * Handler which serves the checkpoint configuration. */ -public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> { +public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<CheckpointConfigInfo, JobMessageParameters> implements JsonArchivist { public CheckpointConfigHandler( CompletableFuture<String> localRestAddress, @@ -64,6 +73,23 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check @Override protected CheckpointConfigInfo handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + return createCheckpointConfigInfo(executionGraph); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody response; + try { + response = createCheckpointConfigInfo(graph); + } catch (RestHandlerException rhe) { + response = new ErrorResponseBody(rhe.getMessage()); + } + String path = CheckpointConfigHeaders.getInstance().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, response)); + } + + private static CheckpointConfigInfo createCheckpointConfigInfo(AccessExecutionGraph executionGraph) throws RestHandlerException { final CheckpointCoordinatorConfiguration checkpointCoordinatorConfiguration = executionGraph.getCheckpointCoordinatorConfiguration(); if (checkpointCoordinatorConfiguration == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java index 2816336..8f2d713 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java @@ -20,15 +20,28 @@ package org.apache.flink.runtime.rest.handler.job.checkpoints; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointMessageParameters; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -36,7 +49,7 @@ import java.util.concurrent.Executor; /** * REST handler which returns the details for a checkpoint. */ -public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> { +public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<CheckpointStatistics, CheckpointMessageParameters> implements JsonArchivist { public CheckpointStatisticDetailsHandler( CompletableFuture<String> localRestAddress, @@ -62,4 +75,22 @@ public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler protected CheckpointStatistics handleCheckpointRequest(HandlerRequest<EmptyRequestBody, CheckpointMessageParameters> ignored, AbstractCheckpointStats checkpointStats) { return CheckpointStatistics.generateCheckpointStatistics(checkpointStats, true); } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); + if (stats == null) { + return Collections.emptyList(); + } + CheckpointStatsHistory history = stats.getHistory(); + List<ArchivedJson> archive = new ArrayList<>(history.getCheckpoints().size()); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + ResponseBody json = CheckpointStatistics.generateCheckpointStatistics(checkpoint, true); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId())); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } } http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java index b9db367..d4345ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java @@ -32,17 +32,25 @@ import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.JobIDPathParameter; import org.apache.flink.runtime.rest.messages.JobMessageParameters; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointingStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -51,7 +59,7 @@ import java.util.concurrent.Executor; /** * Handler which serves the checkpoint statistics. */ -public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> { +public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandler<CheckpointingStatistics, JobMessageParameters> implements JsonArchivist { public CheckpointingStatisticsHandler( CompletableFuture<String> localRestAddress, @@ -66,7 +74,23 @@ public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandle @Override protected CheckpointingStatistics handleRequest(HandlerRequest<EmptyRequestBody, JobMessageParameters> request, AccessExecutionGraph executionGraph) throws RestHandlerException { + return createCheckpointingStatistics(executionGraph); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + ResponseBody json; + try { + json = createCheckpointingStatistics(graph); + } catch (RestHandlerException rhe) { + json = new ErrorResponseBody(rhe.getMessage()); + } + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobIDPathParameter.KEY, graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + private static CheckpointingStatistics createCheckpointingStatistics(AccessExecutionGraph executionGraph) throws RestHandlerException { final CheckpointStatsSnapshot checkpointStatsSnapshot = executionGraph.getCheckpointStatsSnapshot(); if (checkpointStatsSnapshot == null) { http://git-wip-us.apache.org/repos/asf/flink/blob/bb06ba95/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java index cff3bf0..2084c50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java @@ -20,26 +20,35 @@ package org.apache.flink.runtime.rest.handler.job.checkpoints; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; +import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.SubtaskStateStats; import org.apache.flink.runtime.checkpoint.TaskStateStats; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.rest.NotFoundException; import org.apache.flink.runtime.rest.handler.HandlerRequest; import org.apache.flink.runtime.rest.handler.RestHandlerException; import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter; import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointIdPathParameter; import org.apache.flink.runtime.rest.messages.checkpoints.MinMaxAvgStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.SubtaskCheckpointStatistics; import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointMessageParameters; import org.apache.flink.runtime.rest.messages.checkpoints.TaskCheckpointStatisticsWithSubtaskDetails; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; -import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; - +import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -48,7 +57,9 @@ import java.util.concurrent.Executor; /** * REST handler which serves checkpoint statistics for subtasks. */ -public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> { +public class TaskCheckpointStatisticDetailsHandler + extends AbstractCheckpointHandler<TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> + implements JsonArchivist { public TaskCheckpointStatisticDetailsHandler( CompletableFuture<String> localRestAddress, @@ -79,30 +90,54 @@ public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHan final TaskStateStats taskStatistics = checkpointStats.getTaskStateStats(jobVertexId); - if (taskStatistics != null) { - - final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary( - taskStatistics.getSummaryStats(), - checkpointStats.getTriggerTimestamp()); - - final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = createSubtaskCheckpointStatistics( - taskStatistics.getSubtaskStats(), - checkpointStats.getTriggerTimestamp()); - - return new TaskCheckpointStatisticsWithSubtaskDetails( - checkpointStats.getCheckpointId(), - checkpointStats.getStatus(), - taskStatistics.getLatestAckTimestamp(), - taskStatistics.getStateSize(), - taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()), - taskStatistics.getAlignmentBuffered(), - taskStatistics.getNumberOfSubtasks(), - taskStatistics.getNumberOfAcknowledgedSubtasks(), - summary, - subtaskCheckpointStatistics); - } else { - throw new RestHandlerException("There is no checkpoint statistics for task " + jobVertexId + '.', HttpResponseStatus.NOT_FOUND); + if (taskStatistics == null) { + throw new NotFoundException("There is no checkpoint statistics for task " + jobVertexId + '.'); + } + + return createCheckpointDetails(checkpointStats, taskStatistics); + } + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); + if (stats == null) { + return Collections.emptyList(); + } + CheckpointStatsHistory history = stats.getHistory(); + List<ArchivedJson> archive = new ArrayList<>(history.getCheckpoints().size()); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) { + ResponseBody json = createCheckpointDetails(checkpoint, subtaskStats); + String path = getMessageHeaders().getTargetRestEndpointURL() + .replace(':' + JobVertexIdPathParameter.KEY, graph.getJobID().toString()) + .replace(':' + CheckpointIdPathParameter.KEY, String.valueOf(checkpoint.getCheckpointId())) + .replace(':' + JobVertexIdPathParameter.KEY, subtaskStats.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } } + return archive; + } + + private static TaskCheckpointStatisticsWithSubtaskDetails createCheckpointDetails(AbstractCheckpointStats checkpointStats, TaskStateStats taskStatistics) { + final TaskCheckpointStatisticsWithSubtaskDetails.Summary summary = createSummary( + taskStatistics.getSummaryStats(), + checkpointStats.getTriggerTimestamp()); + + final List<SubtaskCheckpointStatistics> subtaskCheckpointStatistics = createSubtaskCheckpointStatistics( + taskStatistics.getSubtaskStats(), + checkpointStats.getTriggerTimestamp()); + + return new TaskCheckpointStatisticsWithSubtaskDetails( + checkpointStats.getCheckpointId(), + checkpointStats.getStatus(), + taskStatistics.getLatestAckTimestamp(), + taskStatistics.getStateSize(), + taskStatistics.getEndToEndDuration(checkpointStats.getTriggerTimestamp()), + taskStatistics.getAlignmentBuffered(), + taskStatistics.getNumberOfSubtasks(), + taskStatistics.getNumberOfAcknowledgedSubtasks(), + summary, + subtaskCheckpointStatistics); } private static TaskCheckpointStatisticsWithSubtaskDetails.Summary createSummary(TaskStateStats.TaskStateStatsSummary taskStatisticsSummary, long triggerTimestamp) {
