[FLINK-5941] Integrate Archiver pattern into handlers This closes #3444.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7fe0eb47 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7fe0eb47 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7fe0eb47 Branch: refs/heads/master Commit: 7fe0eb477df52cfd7254695a67d41f3cba34ef0a Parents: 243ef69 Author: zentol <[email protected]> Authored: Mon Feb 20 16:30:35 2017 +0100 Committer: zentol <[email protected]> Committed: Thu Mar 2 18:27:15 2017 +0100 ---------------------------------------------------------------------- .../runtime/webmonitor/WebRuntimeMonitor.java | 42 ++++++ .../handlers/CurrentJobsOverviewHandler.java | 26 ++++ .../handlers/JobAccumulatorsHandler.java | 15 ++ .../webmonitor/handlers/JobConfigHandler.java | 16 ++ .../webmonitor/handlers/JobDetailsHandler.java | 20 +++ .../handlers/JobExceptionsHandler.java | 15 ++ .../webmonitor/handlers/JobPlanHandler.java | 16 ++ .../handlers/JobVertexAccumulatorsHandler.java | 22 +++ .../handlers/JobVertexDetailsHandler.java | 22 +++ .../handlers/JobVertexTaskManagersHandler.java | 20 +++ ...taskExecutionAttemptAccumulatorsHandler.java | 40 +++++ .../SubtaskExecutionAttemptDetailsHandler.java | 47 ++++++ .../SubtasksAllAccumulatorsHandler.java | 22 +++ .../handlers/SubtasksTimesHandler.java | 22 +++ .../checkpoints/CheckpointConfigHandler.java | 15 ++ .../CheckpointStatsDetailsHandler.java | 28 ++++ .../CheckpointStatsDetailsSubtasksHandler.java | 31 ++++ .../checkpoints/CheckpointStatsHandler.java | 15 ++ .../CurrentJobsOverviewHandlerTest.java | 32 +++- .../handlers/JobAccumulatorsHandlerTest.java | 23 +++ .../handlers/JobConfigHandlerTest.java | 21 +++ .../handlers/JobDetailsHandlerTest.java | 28 ++++ .../handlers/JobExceptionsHandlerTest.java | 23 +++ .../webmonitor/handlers/JobPlanHandlerTest.java | 20 +++ .../JobVertexAccumulatorsHandlerTest.java | 25 ++++ .../handlers/JobVertexDetailsHandlerTest.java | 25 ++++ .../JobVertexTaskManagersHandlerTest.java | 26 ++++ ...ExecutionAttemptAccumulatorsHandlerTest.java | 33 ++++ ...btaskExecutionAttemptDetailsHandlerTest.java | 40 +++++ .../SubtasksAllAccumulatorsHandlerTest.java | 25 ++++ .../handlers/SubtasksTimesHandlerTest.java | 26 ++++ .../CheckpointConfigHandlerTest.java | 140 ++++++++++------- .../CheckpointStatsDetailsHandlerTest.java | 121 +++++++++++---- .../checkpoints/CheckpointStatsHandlerTest.java | 150 +++++++++++++++++-- ...heckpointStatsSubtaskDetailsHandlerTest.java | 41 +++++ .../utils/ArchivedJobGenerationUtils.java | 2 +- .../webmonitor/history/ArchivedJson.java | 45 ++++++ .../webmonitor/history/JsonArchivist.java | 46 ++++++ 38 files changed, 1231 insertions(+), 95 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index dddc69d..e604ce8 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -37,6 +37,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.webmonitor.files.StaticFileServerHandler; @@ -77,6 +78,7 @@ import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsC import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsHandler; import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler; import org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsDetailsSubtasksHandler; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.JobManagerMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.JobMetricsHandler; import org.apache.flink.runtime.webmonitor.metrics.JobVertexMetricsHandler; @@ -424,6 +426,46 @@ public class WebRuntimeMonitor implements WebMonitor { LOG.info("Web frontend listening at " + address + ':' + port); } + /** + * Returns an array of all {@link JsonArchivist}s that are relevant for the history server. + * + * This method is static to allow easier access from the {@link MemoryArchivist}. Requiring a reference + * would imply that the WebRuntimeMonitor is always created before the archivist, which may not hold for all + * deployment modes. + * + * Similarly, no handler implements the JsonArchivist interface itself but instead contains a separate implementing + * class; otherwise we would either instantiate several handlers even though their main functionality isn't + * required, or yet again require that the WebRuntimeMonitor is started before the archivist. + * + * @return array of all JsonArchivists relevant for the history server + */ + public static JsonArchivist[] getArchivers() { + JsonArchivist[] archivists = new JsonArchivist[]{ + new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(), + + new JobPlanHandler.JobPlanJsonArchivist(), + new JobConfigHandler.JobConfigJsonArchivist(), + new JobExceptionsHandler.JobExceptionsJsonArchivist(), + new JobDetailsHandler.JobDetailsJsonArchivist(), + new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(), + + new CheckpointStatsHandler.CheckpointStatsJsonArchivist(), + new CheckpointConfigHandler.CheckpointConfigJsonArchivist(), + new CheckpointStatsDetailsHandler.CheckpointStatsDetailsJsonArchivist(), + new CheckpointStatsDetailsSubtasksHandler.CheckpointStatsDetailsSubtasksJsonArchivist(), + + new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(), + new SubtasksTimesHandler.SubtasksTimesJsonArchivist(), + new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(), + new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(), + new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(), + + new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(), + new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist() + }; + return archivists; + } + @Override public void start(String jobManagerAkkaUrl) throws Exception { LOG.info("Starting with JobManager {} on port {}", jobManagerAkkaUrl, getServerPort()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java index 00cf138..60a2b27 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java @@ -20,16 +20,22 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; import org.apache.flink.runtime.messages.webmonitor.RequestJobDetails; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; import java.io.IOException; import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; import java.util.Map; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -121,6 +127,26 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { } } + public static class CurrentJobsOverviewJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); + try (JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer)) { + gen.writeStartObject(); + gen.writeArrayFieldStart("running"); + gen.writeEndArray(); + gen.writeArrayFieldStart("finished"); + writeJobDetailOverviewAsJson(WebMonitorUtils.createDetailsForJob(graph), gen, System.currentTimeMillis()); + gen.writeEndArray(); + gen.writeEndObject(); + } + String json = writer.toString(); + String path = ALL_JOBS_REST_PATH; + return Collections.singleton(new ArchivedJson(path, json)); + } + } + public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException { gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java index dfc654e..c403aa2 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java @@ -22,9 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; import java.util.Map; /** @@ -48,6 +52,17 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler return createJobAccumulatorsJson(graph); } + public static class JobAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobAccumulatorsJson(graph); + String path = JOB_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 7d72235..2b96456 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -26,6 +26,11 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; + +import java.util.Collection; +import java.util.Collections; /** * Request handler that returns the execution config of a job. @@ -48,6 +53,17 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { return createJobConfigJson(graph); } + public static class JobConfigJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobConfigJson(graph); + String path = JOB_CONFIG_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 6d1f82f..029a4b5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -27,12 +27,16 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; import javax.annotation.Nullable; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; import java.util.Map; /** @@ -67,6 +71,22 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { return createJobDetailsJson(graph, fetcher); } + public static class JobDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobDetailsJson(graph, null); + String path1 = JOB_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + String path2 = JOB_DETAILS_VERTICES_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + Collection<ArchivedJson> archives = new ArrayList(); + archives.add(new ArchivedJson(path1, json)); + archives.add(new ArchivedJson(path2, json)); + return archives; + } + } + public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException { final StringWriter writer = new StringWriter(); final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index 0cce61f..81cdc83 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -23,10 +23,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.util.ExceptionUtils; import java.io.IOException; import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; import java.util.Map; /** @@ -52,6 +56,17 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { return createJobExceptionsJson(graph); } + public static class JobExceptionsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createJobExceptionsJson(graph); + String path = JOB_EXCEPTIONS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java index becc2e1..885d04e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandler.java @@ -20,7 +20,12 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; import java.util.Map; /** @@ -43,4 +48,15 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { return graph.getJsonPlan(); } + + public static class JobPlanJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String path = JOB_PLAN_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + String json = graph.getJsonPlan(); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index ca0488b..2532a1e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -21,11 +21,17 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; @@ -47,6 +53,22 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle return createVertexAccumulatorsJson(jobVertex); } + public static class JobVertexAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createVertexAccumulatorsJson(task); + String path = JOB_VERTEX_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 6e7e47c..d9a1131 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -21,16 +21,22 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; import javax.annotation.Nullable; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; /** @@ -58,6 +64,22 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); } + public static class JobVertexDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createVertexDetailsJson(task, graph.getJobID().toString(), null); + String path = JOB_VERTEX_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + public static String createVertexDetailsJson( AccessExecutionJobVertex jobVertex, String jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index 4fa54bd..3878722 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -20,11 +20,14 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; @@ -32,6 +35,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -61,6 +65,22 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); } + public static class JobVertexTaskManagersJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createVertexDetailsByTaskManagerJson(task, graph.getJobID().toString(), null); + String path = JOB_VERTEX_TASKMANAGERS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + public static String createVertexDetailsByTaskManagerJson( AccessExecutionJobVertex jobVertex, String jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index a63016c..9026a22 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -21,10 +21,18 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; /** @@ -49,6 +57,38 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA return createAttemptAccumulatorsJson(execAttempt); } + public static class SubtaskExecutionAttemptAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + for (AccessExecutionVertex subtask : task.getTaskVertices()) { + String curAttemptJson = createAttemptAccumulatorsJson(subtask.getCurrentExecutionAttempt()); + String curAttemptPath = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); + + archive.add(new ArchivedJson(curAttemptPath, curAttemptJson)); + + for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { + AccessExecution attempt = subtask.getPriorExecutionAttempt(x); + String json = createAttemptAccumulatorsJson(attempt); + String path = SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } + } + } + return archive; + } + } + public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index 5af6af9..078f54a 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -22,16 +22,26 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; import javax.annotation.Nullable; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; +import static org.apache.flink.runtime.webmonitor.handlers.SubtaskCurrentAttemptDetailsHandler.SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH; + /** * Request handler providing details about a single task execution attempt. */ @@ -56,6 +66,43 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); } + public static class SubtaskExecutionAttemptDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + for (AccessExecutionVertex subtask : task.getTaskVertices()) { + String curAttemptJson = createAttemptDetailsJson(subtask.getCurrentExecutionAttempt(), graph.getJobID().toString(), task.getJobVertexId().toString(), null); + String curAttemptPath1 = SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())); + String curAttemptPath2 = SUBTASK_ATTEMPT_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(subtask.getCurrentExecutionAttempt().getAttemptNumber())); + + archive.add(new ArchivedJson(curAttemptPath1, curAttemptJson)); + archive.add(new ArchivedJson(curAttemptPath2, curAttemptJson)); + + for (int x = 0; x < subtask.getCurrentExecutionAttempt().getAttemptNumber(); x++) { + AccessExecution attempt = subtask.getPriorExecutionAttempt(x); + String json = createAttemptDetailsJson(attempt, graph.getJobID().toString(), task.getJobVertexId().toString(), null); + String path = SUBTASK_ATTEMPT_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()) + .replace(":subtasknum", String.valueOf(subtask.getParallelSubtaskIndex())) + .replace(":attempt", String.valueOf(attempt.getAttemptNumber())); + archive.add(new ArchivedJson(path, json)); + } + } + } + return archive; + } + } + public static String createAttemptDetailsJson( AccessExecution execAttempt, String jobID, http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 10a8773..6c3bc18 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; /** @@ -51,6 +57,22 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand return createSubtasksAccumulatorsJson(jobVertex); } + public static class SubtasksAllAccumulatorsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createSubtasksAccumulatorsJson(task); + String path = SUBTASKS_ALL_ACCUMULATORS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index 08bd722..adefa80 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -21,13 +21,19 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import java.util.Map; /** @@ -52,6 +58,22 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { return createSubtaskTimesJson(jobVertex); } + public static class SubtasksTimesJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + List<ArchivedJson> archive = new ArrayList<>(); + for (AccessExecutionJobVertex task : graph.getAllVertices().values()) { + String json = createSubtaskTimesJson(task); + String path = SUBTASK_TIMES_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":vertexid", task.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException { final long now = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index 9976298..947b7c3 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -25,9 +25,13 @@ import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; import java.util.Map; /** @@ -51,6 +55,17 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle return createCheckpointConfigJson(graph); } + public static class CheckpointConfigJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createCheckpointConfigJson(graph); + String path = CHECKPOINT_CONFIG_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index 4bbb8f6..16fd9bd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStats; import org.apache.flink.runtime.checkpoint.FailedCheckpointStats; @@ -28,9 +29,15 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; /** @@ -79,6 +86,27 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest return createCheckpointDetailsJson(checkpoint); } + public static class CheckpointStatsDetailsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); + if (stats == null) { + return Collections.emptyList(); + } + CheckpointStatsHistory history = stats.getHistory(); + List<ArchivedJson> archive = new ArrayList<>(); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + String json = createCheckpointDetailsJson(checkpoint); + String path = CHECKPOINT_STATS_DETAILS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId())); + archive.add(new ArchivedJson(path, json)); + } + return archive; + } + } + public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index b28ecef..bb39b2c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -20,6 +20,7 @@ package org.apache.flink.runtime.webmonitor.handlers.checkpoints; import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.checkpoint.AbstractCheckpointStats; +import org.apache.flink.runtime.checkpoint.CheckpointStatsHistory; import org.apache.flink.runtime.checkpoint.CheckpointStatsSnapshot; import org.apache.flink.runtime.checkpoint.MinMaxAvgStats; import org.apache.flink.runtime.checkpoint.SubtaskStateStats; @@ -31,9 +32,15 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.AbstractJobVertexRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import java.io.IOException; import java.io.StringWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; import java.util.Map; import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg; @@ -104,6 +111,30 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap return createSubtaskCheckpointDetailsJson(checkpoint, taskStats); } + public static class CheckpointStatsDetailsSubtasksJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + CheckpointStatsSnapshot stats = graph.getCheckpointStatsSnapshot(); + if (stats == null) { + return Collections.emptyList(); + } + CheckpointStatsHistory history = stats.getHistory(); + List<ArchivedJson> archive = new ArrayList<>(); + for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { + for (TaskStateStats subtaskStats : checkpoint.getAllTaskStateStats()) { + String json = createSubtaskCheckpointDetailsJson(checkpoint, subtaskStats); + String path = CHECKPOINT_STATS_DETAILS_SUBTASKS_REST_PATH + .replace(":jobid", graph.getJobID().toString()) + .replace(":checkpointid", String.valueOf(checkpoint.getCheckpointId())) + .replace(":vertexid", subtaskStats.getJobVertexId().toString()); + archive.add(new ArchivedJson(path, json)); + } + } + return archive; + } + } + private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index 585ab26..f004888 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -32,10 +32,14 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import javax.annotation.Nullable; import java.io.IOException; import java.io.StringWriter; +import java.util.Collection; +import java.util.Collections; import java.util.Map; /** @@ -59,6 +63,17 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler return createCheckpointStatsJson(graph); } + public static class CheckpointStatsJsonArchivist implements JsonArchivist { + + @Override + public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph) throws IOException { + String json = createCheckpointStatsJson(graph); + String path = CHECKPOINT_STATS_REST_PATH + .replace(":jobid", graph.getJobID().toString()); + return Collections.singletonList(new ArchivedJson(path, json)); + } + } + private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java index caf6d8e..097961e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java @@ -19,19 +19,47 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.messages.webmonitor.JobDetails; import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.io.StringWriter; +import java.util.Collection; import java.util.concurrent.TimeUnit; public class CurrentJobsOverviewHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new CurrentJobsOverviewHandler.CurrentJobsOverviewJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/joboverview", archive.getPath()); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(archive.getJson()); + ArrayNode running = (ArrayNode) result.get("running"); + Assert.assertEquals(0, running.size()); + + ArrayNode finished = (ArrayNode) result.get("finished"); + Assert.assertEquals(1, finished.size()); + + compareJobOverview(expectedDetails, finished.get(0).toString()); + } + @Test public void testGetPaths() { CurrentJobsOverviewHandler handlerAll = new CurrentJobsOverviewHandler(new FiniteDuration(0, TimeUnit.SECONDS), true, true); @@ -58,8 +86,10 @@ public class CurrentJobsOverviewHandlerTest { try (JsonGenerator gen = ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) { CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0); } - String answer = writer.toString(); + compareJobOverview(expectedDetails, writer.toString()); + } + private static void compareJobOverview(JobDetails expectedDetails, String answer) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(answer); Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java index 34748b7..f8ea792 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java @@ -20,11 +20,30 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class JobAccumulatorsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobAccumulatorsHandler.JobAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/accumulators", archive.getPath()); + compareAccumulators(originalJob, archive.getJson()); + } + @Test public void testGetPaths() { JobAccumulatorsHandler handler = new JobAccumulatorsHandler(null); @@ -38,6 +57,10 @@ public class JobAccumulatorsHandlerTest { AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob); + compareAccumulators(originalJob, json); + } + + private static void compareAccumulators(AccessExecutionGraph originalJob, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); ArrayNode accs = (ArrayNode) result.get("job-accumulators"); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java index f304efe..f47b8ca 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java @@ -20,13 +20,31 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.api.common.ArchivedExecutionConfig; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; import java.util.Map; public class JobConfigHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobConfigHandler.JobConfigJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/config", archive.getPath()); + compareJobConfig(originalJob, archive.getJson()); + } + @Test public void testGetPaths() { JobConfigHandler handler = new JobConfigHandler(null); @@ -38,7 +56,10 @@ public class JobConfigHandlerTest { public void testJsonGeneration() throws Exception { AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); String answer = JobConfigHandler.createJobConfigJson(originalJob); + compareJobConfig(originalJob, answer); + } + private static void compareJobConfig(AccessExecutionGraph originalJob, String answer) throws IOException { JsonNode job = ArchivedJobGenerationUtils.mapper.readTree(answer); Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java index 3f80d12..0c4fb7e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java @@ -26,13 +26,37 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; import java.util.List; public class JobDetailsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobDetailsHandler.JobDetailsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(2, archives.size()); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID(), archive1.getPath()); + compareJobDetails(originalJob, archive1.getJson()); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices", archive2.getPath()); + compareJobDetails(originalJob, archive2.getJson()); + } + @Test public void testGetPaths() { JobDetailsHandler handler = new JobDetailsHandler(null, null); @@ -48,6 +72,10 @@ public class JobDetailsHandlerTest { AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); String json = JobDetailsHandler.createJobDetailsJson(originalJob, null); + compareJobDetails(originalJob, json); + } + + private static void compareJobDetails(AccessExecutionGraph originalJob, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java index c86ce6a..c51053a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java @@ -22,12 +22,31 @@ import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.apache.flink.util.ExceptionUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class JobExceptionsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobExceptionsHandler.JobExceptionsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/exceptions", archive.getPath()); + compareExceptions(originalJob, archive.getJson()); + } + @Test public void testGetPaths() { JobExceptionsHandler handler = new JobExceptionsHandler(null); @@ -41,6 +60,10 @@ public class JobExceptionsHandlerTest { AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); String json = JobExceptionsHandler.createJobExceptionsJson(originalJob); + compareExceptions(originalJob, json); + } + + private static void compareExceptions(AccessExecutionGraph originalJob, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java index 42808ed..2ef5bb9 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobPlanHandlerTest.java @@ -17,10 +17,30 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.util.Collection; + public class JobPlanHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobPlanHandler.JobPlanJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/plan", archive.getPath()); + Assert.assertEquals(originalJob.getJsonPlan(), archive.getJson()); + } + @Test public void testGetPaths() { JobPlanHandler handler = new JobPlanHandler(null); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java index 03c1896..8c88da8 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java @@ -20,12 +20,33 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class JobVertexAccumulatorsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobVertexAccumulatorsHandler.JobVertexAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/accumulators", archive.getPath()); + compareAccumulators(originalTask, archive.getJson()); + } + @Test public void testGetPaths() { JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(null); @@ -39,6 +60,10 @@ public class JobVertexAccumulatorsHandlerTest { AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask); + compareAccumulators(originalTask, json); + } + + private static void compareAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java index e909c8c..0fae8b5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java @@ -20,14 +20,35 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class JobVertexDetailsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobVertexDetailsHandler.JobVertexDetailsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId(), archive.getPath()); + compareVertexDetails(originalTask, archive.getJson()); + } + @Test public void testGetPaths() { JobVertexDetailsHandler handler = new JobVertexDetailsHandler(null, null); @@ -42,6 +63,10 @@ public class JobVertexDetailsHandlerTest { String json = JobVertexDetailsHandler.createVertexDetailsJson( originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); + compareVertexDetails(originalTask, json); + } + + private static void compareVertexDetails(AccessExecutionJobVertex originalTask, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java index 11e35e5..9271712 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java @@ -20,15 +20,37 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class JobVertexTaskManagersHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new JobVertexTaskManagersHandler.JobVertexTaskManagersJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + "/taskmanagers", archive.getPath()); + compareVertexTaskManagers(originalTask, originalSubtask, archive.getJson()); + } + @Test public void testGetPaths() { JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(null, null); @@ -44,6 +66,10 @@ public class JobVertexTaskManagersHandlerTest { String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson( originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); + compareVertexTaskManagers(originalTask, originalSubtask, json); + } + + private static void compareVertexTaskManagers(AccessExecutionJobVertex originalTask, AccessExecutionVertex originalSubtask, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index 8d24bd0..5993d5c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -20,11 +20,40 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class SubtaskExecutionAttemptAccumulatorsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtaskExecutionAttemptAccumulatorsHandler.SubtaskExecutionAttemptAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals( + "/jobs/" + originalJob.getJobID() + + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/" + originalAttempt.getParallelSubtaskIndex() + + "/attempts/" + originalAttempt.getAttemptNumber() + + "/accumulators", + archive.getPath()); + compareAttemptAccumulators(originalAttempt, archive.getJson()); + } + @Test public void testGetPaths() { SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(null); @@ -38,6 +67,10 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest { AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt); + compareAttemptAccumulators(originalAttempt, json); + } + + private static void compareAttemptAccumulators(AccessExecution originalAttempt, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java index 54f3f9c..f18858e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -22,11 +22,47 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; + public class SubtaskExecutionAttemptDetailsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtaskExecutionAttemptDetailsHandler.SubtaskExecutionAttemptDetailsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(2, archives.size()); + + Iterator<ArchivedJson> iterator = archives.iterator(); + ArchivedJson archive1 = iterator.next(); + Assert.assertEquals( + "/jobs/" + originalJob.getJobID() + + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/" + originalAttempt.getParallelSubtaskIndex(), + archive1.getPath()); + compareAttemptDetails(originalAttempt, archive1.getJson()); + + ArchivedJson archive2 = iterator.next(); + Assert.assertEquals( + "/jobs/" + originalJob.getJobID() + + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/" + originalAttempt.getParallelSubtaskIndex() + + "/attempts/" + originalAttempt.getAttemptNumber(), + archive2.getPath()); + compareAttemptDetails(originalAttempt, archive2.getJson()); + } + @Test public void testGetPaths() { SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(null, null); @@ -43,6 +79,10 @@ public class SubtaskExecutionAttemptDetailsHandlerTest { String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson( originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null); + compareAttemptDetails(originalAttempt, json); + } + + private static void compareAttemptDetails(AccessExecution originalAttempt, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); http://git-wip-us.apache.org/repos/asf/flink/blob/7fe0eb47/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java index 954ebad..dfbe618 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java @@ -19,13 +19,35 @@ package org.apache.flink.runtime.webmonitor.handlers; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.webmonitor.history.ArchivedJson; +import org.apache.flink.runtime.webmonitor.history.JsonArchivist; import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.util.Collection; + public class SubtasksAllAccumulatorsHandlerTest { + + @Test + public void testArchiver() throws Exception { + JsonArchivist archivist = new SubtasksAllAccumulatorsHandler.SubtasksAllAccumulatorsJsonArchivist(); + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + + Collection<ArchivedJson> archives = archivist.archiveJsonWithPath(originalJob); + Assert.assertEquals(1, archives.size()); + + ArchivedJson archive = archives.iterator().next(); + Assert.assertEquals("/jobs/" + originalJob.getJobID() + "/vertices/" + originalTask.getJobVertexId() + + "/subtasks/accumulators", archive.getPath()); + compareSubtaskAccumulators(originalTask, archive.getJson()); + } + @Test public void testGetPaths() { SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(null); @@ -38,7 +60,10 @@ public class SubtasksAllAccumulatorsHandlerTest { public void testJsonGeneration() throws Exception { AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask); + compareSubtaskAccumulators(originalTask, json); + } + private static void compareSubtaskAccumulators(AccessExecutionJobVertex originalTask, String json) throws IOException { JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText());
