[FLINK-5852] Move handler JSON generation code into static methods This closes #3365.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a552d674 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a552d674 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a552d674 Branch: refs/heads/master Commit: a552d6746636e26c634c86d6a11732ea9d2f239e Parents: 999bace Author: zentol <[email protected]> Authored: Tue Feb 28 18:01:14 2017 +0100 Committer: zentol <[email protected]> Committed: Thu Mar 2 11:39:04 2017 +0100 ---------------------------------------------------------------------- .../api/common/ArchivedExecutionConfig.java | 13 ++ .../handlers/CurrentJobsOverviewHandler.java | 9 +- .../handlers/DashboardConfigHandler.java | 50 +++--- .../handlers/JobAccumulatorsHandler.java | 9 +- .../webmonitor/handlers/JobConfigHandler.java | 4 + .../webmonitor/handlers/JobDetailsHandler.java | 44 ++---- .../handlers/JobExceptionsHandler.java | 11 +- .../handlers/JobVertexAccumulatorsHandler.java | 9 +- .../handlers/JobVertexDetailsHandler.java | 50 +++--- .../handlers/JobVertexTaskManagersHandler.java | 55 +++---- ...taskExecutionAttemptAccumulatorsHandler.java | 7 +- .../SubtaskExecutionAttemptDetailsHandler.java | 55 +++---- .../SubtasksAllAccumulatorsHandler.java | 5 + .../handlers/SubtasksTimesHandler.java | 5 + .../checkpoints/CheckpointConfigHandler.java | 6 +- .../CheckpointStatsDetailsHandler.java | 4 +- .../CheckpointStatsDetailsSubtasksHandler.java | 23 ++- .../checkpoints/CheckpointStatsHandler.java | 16 +- .../webmonitor/utils/MutableIOMetrics.java | 106 +++++++++++++ .../CurrentJobsOverviewHandlerTest.java | 42 +++++ .../handlers/DashboardConfigHandlerTest.java | 22 +++ .../handlers/JobAccumulatorsHandlerTest.java | 20 +++ .../handlers/JobConfigHandlerTest.java | 31 ++++ .../handlers/JobDetailsHandlerTest.java | 95 +++++++++++ .../handlers/JobExceptionsHandlerTest.java | 35 ++++ .../JobVertexAccumulatorsHandlerTest.java | 20 +++ .../handlers/JobVertexDetailsHandlerTest.java | 44 ++++++ .../JobVertexTaskManagersHandlerTest.java | 66 ++++++++ ...ExecutionAttemptAccumulatorsHandlerTest.java | 18 +++ ...btaskExecutionAttemptDetailsHandlerTest.java | 29 ++++ .../SubtasksAllAccumulatorsHandlerTest.java | 32 ++++ .../handlers/SubtasksTimesHandlerTest.java | 37 +++++ .../utils/ArchivedExecutionBuilder.java | 146 +++++++++++++++++ .../utils/ArchivedExecutionConfigBuilder.java | 67 ++++++++ .../utils/ArchivedExecutionGraphBuilder.java | 135 ++++++++++++++++ .../ArchivedExecutionJobVertexBuilder.java | 80 ++++++++++ .../utils/ArchivedExecutionVertexBuilder.java | 69 ++++++++ .../utils/ArchivedJobGenerationUtils.java | 158 +++++++++++++++++++ .../executiongraph/ArchivedExecution.java | 15 ++ .../ArchivedExecutionJobVertex.java | 15 ++ .../executiongraph/ArchivedExecutionVertex.java | 9 ++ .../flink/runtime/executiongraph/IOMetrics.java | 36 +++-- .../ArchivedExecutionGraphTest.java | 6 +- 43 files changed, 1512 insertions(+), 196 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java index f267e91..700d65f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java @@ -54,6 +54,19 @@ public class ArchivedExecutionConfig implements Serializable { } } + public ArchivedExecutionConfig( + String executionMode, + String restartStrategyDescription, + int parallelism, + boolean objectReuseEnabled, + Map<String, String> globalJobParameters) { + this.executionMode = executionMode; + this.restartStrategyDescription = restartStrategyDescription; + this.parallelism = parallelism; + this.objectReuseEnabled = objectReuseEnabled; + this.globalJobParameters = globalJobParameters; + } + public String getExecutionMode() { return executionMode; } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java index 8486a9c..00cf138 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java @@ -28,6 +28,7 @@ import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -89,20 +90,20 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { if (includeRunningJobs && includeFinishedJobs) { gen.writeArrayFieldStart("running"); for (JobDetails detail : result.getRunningJobs()) { - generateSingleJobDetails(detail, gen, now); + writeJobDetailOverviewAsJson(detail, gen, now); } gen.writeEndArray(); gen.writeArrayFieldStart("finished"); for (JobDetails detail : result.getFinishedJobs()) { - generateSingleJobDetails(detail, gen, now); + writeJobDetailOverviewAsJson(detail, gen, now); } gen.writeEndArray(); } else { gen.writeArrayFieldStart("jobs"); for (JobDetails detail : includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) { - generateSingleJobDetails(detail, gen, now); + writeJobDetailOverviewAsJson(detail, gen, now); } gen.writeEndArray(); } @@ -120,7 +121,7 @@ public class CurrentJobsOverviewHandler extends AbstractJsonRequestHandler { } } - private static void generateSingleJobDetails(JobDetails details, JsonGenerator gen, long now) throws Exception { + public static void writeJobDetailOverviewAsJson(JobDetails details, JsonGenerator gen, long now) throws IOException { gen.writeStartObject(); gen.writeStringField("jid", details.getJobId().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java index 49f4c26..6ad024f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java @@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.util.EnvironmentInformation; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; import java.util.TimeZone; @@ -38,29 +39,8 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler { private final String configString; public DashboardConfigHandler(long refreshInterval) { - TimeZone timeZome = TimeZone.getDefault(); - String timeZoneName = timeZome.getDisplayName(); - long timeZoneOffset= timeZome.getRawOffset(); - try { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); - - gen.writeStartObject(); - gen.writeNumberField("refresh-interval", refreshInterval); - gen.writeNumberField("timezone-offset", timeZoneOffset); - gen.writeStringField("timezone-name", timeZoneName); - gen.writeStringField("flink-version", EnvironmentInformation.getVersion()); - - EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); - if (revision != null) { - gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate); - } - - gen.writeEndObject(); - - gen.close(); - this.configString = writer.toString(); + this.configString = createConfigJson(refreshInterval); } catch (Exception e) { // should never happen @@ -77,4 +57,30 @@ public class DashboardConfigHandler extends AbstractJsonRequestHandler { public String handleJsonRequest(Map<String, String> pathParams, Map<String, String> queryParams, ActorGateway jobManager) throws Exception { return this.configString; } + + public static String createConfigJson(long refreshInterval) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + + TimeZone timeZone = TimeZone.getDefault(); + String timeZoneName = timeZone.getDisplayName(); + long timeZoneOffset = timeZone.getRawOffset(); + + gen.writeStartObject(); + gen.writeNumberField("refresh-interval", refreshInterval); + gen.writeNumberField("timezone-offset", timeZoneOffset); + gen.writeStringField("timezone-name", timeZoneName); + gen.writeStringField("flink-version", EnvironmentInformation.getVersion()); + + EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); + if (revision != null) { + gen.writeStringField("flink-revision", revision.commitId + " @ " + revision.commitDate); + } + + gen.writeEndObject(); + + gen.close(); + + return writer.toString(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java index 7664153..dfc654e 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -44,11 +45,15 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); - + return createJobAccumulatorsJson(graph); + } + + public static String createJobAccumulatorsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + StringifiedAccumulatorResult[] allAccumulators = graph.getAccumulatorResultsStringified(); + gen.writeStartObject(); gen.writeArrayFieldStart("job-accumulators"); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java index 459ca2a..7d72235 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.webmonitor.handlers; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -44,7 +45,10 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobConfigJson(graph); + } + public static String createJobConfigJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java index 7780e66..6d1f82f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java @@ -25,13 +25,13 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.IOMetrics; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.MetricStore; +import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import javax.annotation.Nullable; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -64,6 +64,10 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobDetailsJson(graph, fetcher); + } + + public static String createJobDetailsJson(AccessExecutionGraph graph, @Nullable MetricFetcher fetcher) throws IOException { final StringWriter writer = new StringWriter(); final JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -153,37 +157,17 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { } gen.writeEndObject(); - long numBytesIn = 0; - long numBytesOut = 0; - long numRecordsIn = 0; - long numRecordsOut = 0; + MutableIOMetrics counts = new MutableIOMetrics(); for (AccessExecutionVertex vertex : ejv.getTaskVertices()) { - IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); - - if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph - numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); - numBytesOut += ioMetrics.getNumBytesOut(); - numRecordsIn += ioMetrics.getNumRecordsIn(); - numRecordsOut += ioMetrics.getNumRecordsOut(); - } else { // execAttempt is still running, use MetricQueryService instead - fetcher.update(); - MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex()); - if (metrics != null) { - numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); - numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); - numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); - numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); - } - } + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + fetcher, + graph.getJobID().toString(), + ejv.getJobVertexId().toString()); } - gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", numBytesIn); - gen.writeNumberField("write-bytes", numBytesOut); - gen.writeNumberField("read-records", numRecordsIn); - gen.writeNumberField("write-records", numRecordsOut); - gen.writeEndObject(); + counts.writeIOMetricsAsJson(gen); gen.writeEndObject(); } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java index 3720dac..0cce61f 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.util.ExceptionUtils; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -35,7 +36,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_EXCEPTIONS_REST_PATH = "/jobs/:jobid/exceptions"; - private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; + static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) { super(executionGraphHolder); @@ -48,6 +49,10 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createJobExceptionsJson(graph); + } + + public static String createJobExceptionsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -55,7 +60,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { // most important is the root failure cause String rootException = graph.getFailureCauseAsString(); - if (rootException != null) { + if (rootException != null && !rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { gen.writeStringField("root-exception", rootException); } @@ -67,7 +72,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { for (AccessExecutionVertex task : graph.getAllExecutionVertices()) { String t = task.getFailureCauseAsString(); - if (!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + if (t != null && !t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { if (numExceptionsSoFar >= MAX_NUMBER_EXCEPTION_TO_REPORT) { truncated = true; break; http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java index ccfcbba..ca0488b 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -43,11 +44,15 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { - StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); - + return createVertexAccumulatorsJson(jobVertex); + } + + public static String createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + StringifiedAccumulatorResult[] accs = jobVertex.getAggregatedUserAccumulatorsStringified(); + gen.writeStartObject(); gen.writeStringField("id", jobVertex.getJobVertexId().toString()); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java index 0a07896..6e7e47c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java @@ -23,13 +23,13 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.MetricStore; +import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import javax.annotation.Nullable; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -55,6 +55,13 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + return createVertexDetailsJson(jobVertex, params.get("jobid"), fetcher); + } + + public static String createVertexDetailsJson( + AccessExecutionJobVertex jobVertex, + String jobID, + @Nullable MetricFetcher fetcher) throws IOException { final long now = System.currentTimeMillis(); StringWriter writer = new StringWriter(); @@ -91,35 +98,16 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { gen.writeNumberField("end-time", endTime); gen.writeNumberField("duration", duration); - IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); - - long numBytesIn = 0; - long numBytesOut = 0; - long numRecordsIn = 0; - long numRecordsOut = 0; - - if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph - numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); - numBytesOut = ioMetrics.getNumBytesOut(); - numRecordsIn = ioMetrics.getNumRecordsIn(); - numRecordsOut = ioMetrics.getNumRecordsOut(); - } else { // execAttempt is still running, use MetricQueryService instead - fetcher.update(); - MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex()); - if (metrics != null) { - numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); - numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); - numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); - numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); - } - } + MutableIOMetrics counts = new MutableIOMetrics(); - gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", numBytesIn); - gen.writeNumberField("write-bytes", numBytesOut); - gen.writeNumberField("read-records", numRecordsIn); - gen.writeNumberField("write-records", numRecordsOut); - gen.writeEndObject(); + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + fetcher, + jobID, + jobVertex.getJobVertexId().toString() + ); + + counts.writeIOMetricsAsJson(gen); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java index b3dabea..4fa54bd 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java @@ -23,19 +23,18 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.MetricStore; +import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import javax.annotation.Nullable; +import java.io.IOException; import java.io.StringWriter; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Map.Entry; /** * A request handler that provides the details of a job vertex, including id, name, and the @@ -59,6 +58,16 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + return createVertexDetailsByTaskManagerJson(jobVertex, params.get("jobid"), fetcher); + } + + public static String createVertexDetailsByTaskManagerJson( + AccessExecutionJobVertex jobVertex, + String jobID, + @Nullable MetricFetcher fetcher) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + // Build a map that groups tasks by TaskManager Map<String, List<AccessExecutionVertex>> taskManagerVertices = new HashMap<>(); @@ -79,8 +88,6 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle // Build JSON response final long now = System.currentTimeMillis(); - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); @@ -89,7 +96,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle gen.writeNumberField("now", now); gen.writeArrayFieldStart("taskmanagers"); - for (Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) { + for (Map.Entry<String, List<AccessExecutionVertex>> entry : taskManagerVertices.entrySet()) { String host = entry.getKey(); List<AccessExecutionVertex> taskVertices = entry.getValue(); @@ -99,10 +106,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle long endTime = 0; boolean allFinished = true; - long numBytesIn = 0; - long numBytesOut = 0; - long numRecordsIn = 0; - long numRecordsOut = 0; + MutableIOMetrics counts = new MutableIOMetrics(); for (AccessExecutionVertex vertex : taskVertices) { final ExecutionState state = vertex.getExecutionState(); @@ -117,23 +121,11 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle allFinished &= state.isTerminal(); endTime = Math.max(endTime, vertex.getStateTimestamp(state)); - IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); - - if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph - numBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); - numBytesOut += ioMetrics.getNumBytesOut(); - numRecordsIn += ioMetrics.getNumRecordsIn(); - numRecordsOut += ioMetrics.getNumRecordsOut(); - } else { // execAttempt is still running, use MetricQueryService instead - fetcher.update(); - MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), vertex.getParallelSubtaskIndex()); - if (metrics != null) { - numBytesIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); - numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); - numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); - numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); - } - } + counts.addIOMetrics( + vertex.getCurrentExecutionAttempt(), + fetcher, + jobID, + jobVertex.getJobVertexId().toString()); } long duration; @@ -164,12 +156,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle gen.writeNumberField("end-time", endTime); gen.writeNumberField("duration", duration); - gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", numBytesIn); - gen.writeNumberField("write-bytes", numBytesOut); - gen.writeNumberField("read-records", numRecordsIn); - gen.writeNumberField("write-records", numRecordsOut); - gen.writeEndObject(); + counts.writeIOMetricsAsJson(gen); gen.writeObjectFieldStart("status-counts"); for (ExecutionState state : ExecutionState.values()) { http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java index ba3a5ee..a63016c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; import org.apache.flink.runtime.executiongraph.AccessExecution; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -45,10 +46,14 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA @Override public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { - final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); + return createAttemptAccumulatorsJson(execAttempt); + } + public static String createAttemptAccumulatorsJson(AccessExecution execAttempt) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + + final StringifiedAccumulatorResult[] accs = execAttempt.getUserAccumulatorsStringified(); gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java index b753b6e..5af6af9 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java @@ -22,13 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecution; -import org.apache.flink.runtime.executiongraph.IOMetrics; -import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; -import org.apache.flink.runtime.webmonitor.metrics.MetricStore; +import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics; +import javax.annotation.Nullable; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -53,6 +53,17 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp @Override public String handleRequest(AccessExecution execAttempt, Map<String, String> params) throws Exception { + return createAttemptDetailsJson(execAttempt, params.get("jobid"), params.get("vertexid"), fetcher); + } + + public static String createAttemptDetailsJson( + AccessExecution execAttempt, + String jobID, + String vertexID, + @Nullable MetricFetcher fetcher) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + final ExecutionState status = execAttempt.getState(); final long now = System.currentTimeMillis(); @@ -66,9 +77,6 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp long endTime = status.isTerminal() ? execAttempt.getStateTimestamp(status) : -1; long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) - startTime) : -1; - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); - gen.writeStartObject(); gen.writeNumberField("subtask", execAttempt.getParallelSubtaskIndex()); gen.writeStringField("status", status.name()); @@ -78,35 +86,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp gen.writeNumberField("end-time", endTime); gen.writeNumberField("duration", duration); - IOMetrics ioMetrics = execAttempt.getIOMetrics(); + MutableIOMetrics counts = new MutableIOMetrics(); - long numBytesIn = 0; - long numBytesOut = 0; - long numRecordsIn = 0; - long numRecordsOut = 0; + counts.addIOMetrics( + execAttempt, + fetcher, + jobID, + vertexID + ); - if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph - numBytesIn = ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); - numBytesOut = ioMetrics.getNumBytesOut(); - numRecordsIn = ioMetrics.getNumRecordsIn(); - numRecordsOut = ioMetrics.getNumRecordsOut(); - } else { // execAttempt is still running, use MetricQueryService instead - fetcher.update(); - MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), params.get("vertexid"), execAttempt.getParallelSubtaskIndex()); - if (metrics != null) { - numBytesIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); - numBytesOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); - numRecordsIn = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); - numRecordsOut = Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); - } - } - - gen.writeObjectFieldStart("metrics"); - gen.writeNumberField("read-bytes", numBytesIn); - gen.writeNumberField("write-bytes", numBytesOut); - gen.writeNumberField("read-records", numRecordsIn); - gen.writeNumberField("write-records", numRecordsOut); - gen.writeEndObject(); + counts.writeIOMetricsAsJson(gen); gen.writeEndObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java index 222d474..10a8773 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -47,6 +48,10 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + return createSubtasksAccumulatorsJson(jobVertex); + } + + public static String createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java index e2e35e3..08bd722 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -48,6 +49,10 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { @Override public String handleRequest(AccessExecutionJobVertex jobVertex, Map<String, String> params) throws Exception { + return createSubtaskTimesJson(jobVertex); + } + + public static String createSubtaskTimesJson(AccessExecutionJobVertex jobVertex) throws IOException { final long now = System.currentTimeMillis(); StringWriter writer = new StringWriter(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java index de40a4a..9976298 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder; import org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.webmonitor.handlers.JsonFactory; +import java.io.IOException; import java.io.StringWriter; import java.util.Map; @@ -47,8 +48,11 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { - StringWriter writer = new StringWriter(); + return createCheckpointConfigJson(graph); + } + private static String createCheckpointConfigJson(AccessExecutionGraph graph) throws IOException { + StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); JobSnapshottingSettings settings = graph.getJobSnapshottingSettings(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java index e651824..4bbb8f6 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java @@ -76,10 +76,10 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest } } - return writeResponse(checkpoint); + return createCheckpointDetailsJson(checkpoint); } - private String writeResponse(AbstractCheckpointStats checkpoint) throws IOException { + public static String createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index 15dd911..b28ecef 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -36,6 +36,7 @@ import java.io.IOException; import java.io.StringWriter; import java.util.Map; +import static org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -95,19 +96,19 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap } } - return writeResponse(checkpoint, vertexId); - } - - private String writeResponse(AbstractCheckpointStats checkpoint, JobVertexID vertexId) throws IOException { - StringWriter writer = new StringWriter(); - JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); - gen.writeStartObject(); - TaskStateStats taskStats = checkpoint.getTaskStateStats(vertexId); if (taskStats == null) { return "{}"; } + + return createSubtaskCheckpointDetailsJson(checkpoint, taskStats); + } + private static String createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, TaskStateStats taskStats) throws IOException { + StringWriter writer = new StringWriter(); + JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); + + gen.writeStartObject(); // Overview gen.writeNumberField("id", checkpoint.getCheckpointId()); gen.writeStringField("status", checkpoint.getStatus().toString()); @@ -188,10 +189,4 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap return writer.toString(); } - private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { - gen.writeNumberField("min", minMaxAvg.getMinimum()); - gen.writeNumberField("max", minMaxAvg.getMaximum()); - gen.writeNumberField("avg", minMaxAvg.getAverage()); - } - } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java index 6413806..585ab26 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java @@ -56,6 +56,10 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler @Override public String handleRequest(AccessExecutionGraph graph, Map<String, String> params) throws Exception { + return createCheckpointStatsJson(graph); + } + + private static String createCheckpointStatsJson(AccessExecutionGraph graph) throws IOException { StringWriter writer = new StringWriter(); JsonGenerator gen = JsonFactory.jacksonFactory.createGenerator(writer); @@ -91,7 +95,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler return writer.toString(); } - private void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException { + private static void writeCounts(JsonGenerator gen, CheckpointStatsCounts counts) throws IOException { gen.writeObjectFieldStart("counts"); gen.writeNumberField("restored", counts.getNumberOfRestoredCheckpoints()); gen.writeNumberField("total", counts.getTotalNumberOfCheckpoints()); @@ -101,7 +105,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler gen.writeEndObject(); } - private void writeSummary( + private static void writeSummary( JsonGenerator gen, CompletedCheckpointStatsSummary summary) throws IOException { gen.writeObjectFieldStart("summary"); @@ -119,13 +123,13 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler gen.writeEndObject(); } - private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { + static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) throws IOException { gen.writeNumberField("min", minMaxAvg.getMinimum()); gen.writeNumberField("max", minMaxAvg.getMaximum()); gen.writeNumberField("avg", minMaxAvg.getAverage()); } - private void writeLatestCheckpoints( + private static void writeLatestCheckpoints( JsonGenerator gen, @Nullable CompletedCheckpointStats completed, @Nullable CompletedCheckpointStats savepoint, @@ -187,7 +191,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler gen.writeEndObject(); } - private void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException { + private static void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats checkpoint) throws IOException { gen.writeNumberField("id", checkpoint.getCheckpointId()); gen.writeNumberField("trigger_timestamp", checkpoint.getTriggerTimestamp()); gen.writeNumberField("latest_ack_timestamp", checkpoint.getLatestAckTimestamp()); @@ -197,7 +201,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler } - private void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException { + private static void writeHistory(JsonGenerator gen, CheckpointStatsHistory history) throws IOException { gen.writeArrayFieldStart("history"); for (AbstractCheckpointStats checkpoint : history.getCheckpoints()) { gen.writeStartObject(); http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java new file mode 100644 index 0000000..32cda7f --- /dev/null +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonGenerator; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.metrics.MetricNames; +import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler; +import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher; +import org.apache.flink.runtime.webmonitor.metrics.MetricStore; + +import javax.annotation.Nullable; +import java.io.IOException; + +/** + * This class is a mutable version of the {@link IOMetrics} class that allows adding up IO-related metrics. + * + * For finished jobs these metrics are stored in the {@link ExecutionGraph} as another {@link IOMetrics}. + * For running jobs these metrics are retrieved using the {@link MetricFetcher}. + * + * This class provides a common interface to handle both cases, reducing complexity in various handlers (like + * the {@link JobVertexDetailsHandler}). + */ +public class MutableIOMetrics extends IOMetrics { + + private static final long serialVersionUID = -5460777634971381737L; + + public MutableIOMetrics() { + super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D); + } + + /** + * Adds the IO metrics for the given attempt to this object. If the {@link AccessExecution} is in + * a terminal state the contained {@link IOMetrics} object is added. Otherwise the given {@link MetricFetcher} is + * used to retrieve the required metrics. + * + * @param attempt Attempt whose IO metrics should be added + * @param fetcher MetricFetcher to retrieve metrics for running jobs + * @param jobID JobID to which the attempt belongs + * @param taskID TaskID to which the attempt belongs + */ + public void addIOMetrics(AccessExecution attempt, @Nullable MetricFetcher fetcher, String jobID, String taskID) { + if (attempt.getState().isTerminal()) { + IOMetrics ioMetrics = attempt.getIOMetrics(); + if (ioMetrics != null) { // execAttempt is already finished, use final metrics stored in ExecutionGraph + this.numBytesInLocal += ioMetrics.getNumBytesInLocal(); + this.numBytesInRemote += ioMetrics.getNumBytesInRemote(); + this.numBytesOut += ioMetrics.getNumBytesOut(); + this.numRecordsIn += ioMetrics.getNumRecordsIn(); + this.numRecordsOut += ioMetrics.getNumRecordsOut(); + } + } else { // execAttempt is still running, use MetricQueryService instead + if (fetcher != null) { + fetcher.update(); + MetricStore.SubtaskMetricStore metrics = fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, attempt.getParallelSubtaskIndex()); + if (metrics != null) { + this.numBytesInLocal += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")); + this.numBytesInRemote += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0")); + this.numBytesOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0")); + this.numRecordsIn += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0")); + this.numRecordsOut += Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0")); + } + } + } + } + + /** + * Writes the IO metrics contained in this object to the given {@link JsonGenerator}. + * + * The JSON structure written is as follows: + * "metrics": { + * "read-bytes": 1, + * "write-bytes": 2, + * "read-records": 3, + * "write-records": 4 + * } + * + * @param gen JsonGenerator to which the metrics should be written + * @throws IOException + */ + public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException { + gen.writeObjectFieldStart("metrics"); + gen.writeNumberField("read-bytes",this.numBytesInLocal + this.numBytesInRemote); + gen.writeNumberField("write-bytes", this.numBytesOut); + gen.writeNumberField("read-records", this.numRecordsIn); + gen.writeNumberField("write-records", this.numRecordsOut); + gen.writeEndObject(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java index 3207fec..caf6d8e 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java @@ -17,10 +17,18 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.webmonitor.JobDetails; +import org.apache.flink.runtime.webmonitor.WebMonitorUtils; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; +import java.io.StringWriter; import java.util.concurrent.TimeUnit; public class CurrentJobsOverviewHandlerTest { @@ -41,4 +49,38 @@ public class CurrentJobsOverviewHandlerTest { Assert.assertEquals(1, pathsCompleted.length); Assert.assertEquals("/joboverview/completed", pathsCompleted[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + JobDetails expectedDetails = WebMonitorUtils.createDetailsForJob(originalJob); + StringWriter writer = new StringWriter(); + try (JsonGenerator gen = ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) { + CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 0); + } + String answer = writer.toString(); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(answer); + + Assert.assertEquals(expectedDetails.getJobId().toString(), result.get("jid").asText()); + Assert.assertEquals(expectedDetails.getJobName(), result.get("name").asText()); + Assert.assertEquals(expectedDetails.getStatus().name(), result.get("state").asText()); + + Assert.assertEquals(expectedDetails.getStartTime(), result.get("start-time").asLong()); + Assert.assertEquals(expectedDetails.getEndTime(), result.get("end-time").asLong()); + Assert.assertEquals(expectedDetails.getEndTime() - expectedDetails.getStartTime(), result.get("duration").asLong()); + Assert.assertEquals(expectedDetails.getLastUpdateTime(), result.get("last-modification").asLong()); + + JsonNode tasks = result.get("tasks"); + Assert.assertEquals(expectedDetails.getNumTasks(), tasks.get("total").asInt()); + int[] tasksPerState = expectedDetails.getNumVerticesPerExecutionState(); + Assert.assertEquals( + tasksPerState[ExecutionState.CREATED.ordinal()] + tasksPerState[ExecutionState.SCHEDULED.ordinal()] + tasksPerState[ExecutionState.DEPLOYING.ordinal()], + tasks.get("pending").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], tasks.get("running").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], tasks.get("finished").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], tasks.get("canceling").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], tasks.get("canceled").asInt()); + Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], tasks.get("failed").asInt()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java index aa2d552..9784a06 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java @@ -17,9 +17,14 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.util.TimeZone; + public class DashboardConfigHandlerTest { @Test public void testGetPaths() { @@ -28,4 +33,21 @@ public class DashboardConfigHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/config", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + long refreshInterval = 12345; + TimeZone timeZone = TimeZone.getDefault(); + EnvironmentInformation.RevisionInformation revision = EnvironmentInformation.getRevisionInformation(); + + String json = DashboardConfigHandler.createConfigJson(refreshInterval); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(refreshInterval, result.get("refresh-interval").asLong()); + Assert.assertEquals(timeZone.getDisplayName(), result.get("timezone-name").asText()); + Assert.assertEquals(timeZone.getRawOffset(), result.get("timezone-offset").asLong()); + Assert.assertEquals(EnvironmentInformation.getVersion(), result.get("flink-version").asText()); + Assert.assertEquals(revision.commitId + " @ " + revision.commitDate, result.get("flink-revision").asText()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java index 96c7dd5..34748b7 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java @@ -17,6 +17,10 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +32,20 @@ public class JobAccumulatorsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String json = JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + ArrayNode accs = (ArrayNode) result.get("job-accumulators"); + Assert.assertEquals(0, accs.size()); + + Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0); + ArchivedJobGenerationUtils.compareStringifiedAccumulators( + originalJob.getAccumulatorResultsStringified(), + (ArrayNode) result.get("user-task-accumulators")); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java index 47ea6bf..f304efe 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java @@ -17,9 +17,15 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; +import java.util.Map; + public class JobConfigHandlerTest { @Test public void testGetPaths() { @@ -28,4 +34,29 @@ public class JobConfigHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/config", paths[0]); } + + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String answer = JobConfigHandler.createJobConfigJson(originalJob); + + JsonNode job = ArchivedJobGenerationUtils.mapper.readTree(answer); + + Assert.assertEquals(originalJob.getJobID().toString(), job.get("jid").asText()); + Assert.assertEquals(originalJob.getJobName(), job.get("name").asText()); + + ArchivedExecutionConfig originalConfig = originalJob.getArchivedExecutionConfig(); + JsonNode config = job.get("execution-config"); + + Assert.assertEquals(originalConfig.getExecutionMode(), config.get("execution-mode").asText()); + Assert.assertEquals(originalConfig.getRestartStrategyDescription(), config.get("restart-strategy").asText()); + Assert.assertEquals(originalConfig.getParallelism(), config.get("job-parallelism").asInt()); + Assert.assertEquals(originalConfig.getObjectReuseEnabled(), config.get("object-reuse-mode").asBoolean()); + + Map<String, String> originalUserConfig = originalConfig.getGlobalJobParameters(); + JsonNode userConfig = config.get("user-config"); + + for (Map.Entry<String, String> originalEntry : originalUserConfig.entrySet()) { + Assert.assertEquals(originalEntry.getValue(), userConfig.get(originalEntry.getKey()).asText()); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java index b56bd64..3f80d12 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java @@ -17,7 +17,16 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; import com.google.common.collect.Lists; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -33,4 +42,90 @@ public class JobDetailsHandlerTest { Assert.assertTrue(pathsList.contains("/jobs/:jobid")); Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices")); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String json = JobDetailsHandler.createJobDetailsJson(originalJob, null); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalJob.getJobID().toString(), result.get("jid").asText()); + Assert.assertEquals(originalJob.getJobName(), result.get("name").asText()); + Assert.assertEquals(originalJob.isStoppable(), result.get("isStoppable").asBoolean()); + Assert.assertEquals(originalJob.getState().name(), result.get("state").asText()); + + Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), result.get("start-time").asLong()); + Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), result.get("end-time").asLong()); + Assert.assertEquals( + originalJob.getStatusTimestamp(originalJob.getState()) - originalJob.getStatusTimestamp(JobStatus.CREATED), + result.get("duration").asLong() + ); + + JsonNode timestamps = result.get("timestamps"); + for (JobStatus status : JobStatus.values()) { + Assert.assertEquals(originalJob.getStatusTimestamp(status), timestamps.get(status.name()).asLong()); + } + + ArrayNode tasks = (ArrayNode) result.get("vertices"); + int x = 0; + for (AccessExecutionJobVertex expectedTask : originalJob.getVerticesTopologically()) { + JsonNode task = tasks.get(x); + + Assert.assertEquals(expectedTask.getJobVertexId().toString(), task.get("id").asText()); + Assert.assertEquals(expectedTask.getName(), task.get("name").asText()); + Assert.assertEquals(expectedTask.getParallelism(), task.get("parallelism").asInt()); + Assert.assertEquals(expectedTask.getAggregateState().name(), task.get("status").asText()); + + Assert.assertEquals(3, task.get("start-time").asLong()); + Assert.assertEquals(5, task.get("end-time").asLong()); + Assert.assertEquals(2, task.get("duration").asLong()); + + JsonNode subtasksPerState = task.get("tasks"); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CREATED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.RUNNING.name()).asInt()); + Assert.assertEquals(1, subtasksPerState.get(ExecutionState.FINISHED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELING.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.CANCELED.name()).asInt()); + Assert.assertEquals(0, subtasksPerState.get(ExecutionState.FAILED.name()).asInt()); + + long expectedNumBytesIn = 0; + long expectedNumBytesOut = 0; + long expectedNumRecordsIn = 0; + long expectedNumRecordsOut = 0; + + for (AccessExecutionVertex vertex : expectedTask.getTaskVertices()) { + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + expectedNumBytesOut += ioMetrics.getNumBytesOut(); + expectedNumRecordsIn += ioMetrics.getNumRecordsIn(); + expectedNumRecordsOut += ioMetrics.getNumRecordsOut(); + } + + JsonNode metrics = task.get("metrics"); + + Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong()); + Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong()); + Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong()); + Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong()); + + x++; + } + Assert.assertEquals(1, tasks.size()); + + JsonNode statusCounts = result.get("status-counts"); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt()); + Assert.assertEquals(1, statusCounts.get(ExecutionState.RUNNING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.FINISHED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt()); + + Assert.assertEquals(ArchivedJobGenerationUtils.mapper.readTree(originalJob.getJsonPlan()), result.get("plan")); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java index 850971a..c86ce6a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java @@ -17,6 +17,13 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; +import org.apache.flink.util.ExceptionUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +35,32 @@ public class JobExceptionsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + String json = JobExceptionsHandler.createJobExceptionsJson(originalJob); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalJob.getFailureCauseAsString(), result.get("root-exception").asText()); + + ArrayNode exceptions = (ArrayNode) result.get("all-exceptions"); + + int x = 0; + for (AccessExecutionVertex expectedSubtask : originalJob.getAllExecutionVertices()) { + if (!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) { + JsonNode exception = exceptions.get(x); + + Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), exception.get("exception").asText()); + Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), exception.get("task").asText()); + + TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation(); + String expectedLocationString = location.getFQDNHostname() + ':' + location.dataPort(); + Assert.assertEquals(expectedLocationString, exception.get("location").asText()); + } + x++; + } + Assert.assertEquals(x > JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, result.get("truncated").asBoolean()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java index d513836..03c1896 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java @@ -17,6 +17,11 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +33,19 @@ public class JobVertexAccumulatorsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + String json = JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + + ArrayNode accs = (ArrayNode) result.get("user-accumulators"); + StringifiedAccumulatorResult[] expectedAccs = originalTask.getAggregatedUserAccumulatorsStringified(); + + ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java index d20d736..e909c8c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java @@ -17,6 +17,13 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +35,41 @@ public class JobVertexDetailsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + String json = JobVertexDetailsHandler.createVertexDetailsJson( + originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getName(), result.get("name").asText()); + Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt()); + Assert.assertTrue(result.get("now").asLong() > 0); + + ArrayNode subtasks = (ArrayNode) result.get("subtasks"); + + Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size()); + for (int x = 0; x < originalTask.getTaskVertices().length; x++) { + AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x]; + JsonNode subtask = subtasks.get(x); + + Assert.assertEquals(x, subtask.get("subtask").asInt()); + Assert.assertEquals(expectedSubtask.getExecutionState().name(), subtask.get("status").asText()); + Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt()); + + TaskManagerLocation location = expectedSubtask.getCurrentAssignedResourceLocation(); + String expectedLocationString = location.getHostname() + ":" + location.dataPort(); + Assert.assertEquals(expectedLocationString, subtask.get("host").asText()); + long start = expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING); + Assert.assertEquals(start, subtask.get("start-time").asLong()); + long end = expectedSubtask.getStateTimestamp(ExecutionState.FINISHED); + Assert.assertEquals(end, subtask.get("end-time").asLong()); + Assert.assertEquals(end - start, subtask.get("duration").asLong()); + + ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(), subtask.get("metrics")); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java index e56a517..11e35e5 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java @@ -17,6 +17,14 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +36,62 @@ public class JobVertexTaskManagersHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecutionVertex originalSubtask = ArchivedJobGenerationUtils.getTestSubtask(); + String json = JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson( + originalTask, ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getName(), result.get("name").asText()); + Assert.assertTrue(result.get("now").asLong() > 0); + + ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers"); + + JsonNode taskManager = taskmanagers.get(0); + + TaskManagerLocation location = originalSubtask.getCurrentAssignedResourceLocation(); + String expectedLocationString = location.getHostname() + ':' + location.dataPort(); + Assert.assertEquals(expectedLocationString, taskManager.get("host").asText()); + Assert.assertEquals(ExecutionState.FINISHED.name(), taskManager.get("status").asText()); + + Assert.assertEquals(3, taskManager.get("start-time").asLong()); + Assert.assertEquals(5, taskManager.get("end-time").asLong()); + Assert.assertEquals(2, taskManager.get("duration").asLong()); + + JsonNode statusCounts = taskManager.get("status-counts"); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CREATED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.SCHEDULED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.DEPLOYING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.RUNNING.name()).asInt()); + Assert.assertEquals(1, statusCounts.get(ExecutionState.FINISHED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELING.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.CANCELED.name()).asInt()); + Assert.assertEquals(0, statusCounts.get(ExecutionState.FAILED.name()).asInt()); + + long expectedNumBytesIn = 0; + long expectedNumBytesOut = 0; + long expectedNumRecordsIn = 0; + long expectedNumRecordsOut = 0; + + for (AccessExecutionVertex vertex : originalTask.getTaskVertices()) { + IOMetrics ioMetrics = vertex.getCurrentExecutionAttempt().getIOMetrics(); + + expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote(); + expectedNumBytesOut += ioMetrics.getNumBytesOut(); + expectedNumRecordsIn += ioMetrics.getNumRecordsIn(); + expectedNumRecordsOut += ioMetrics.getNumRecordsOut(); + } + + JsonNode metrics = taskManager.get("metrics"); + + Assert.assertEquals(expectedNumBytesIn, metrics.get("read-bytes").asLong()); + Assert.assertEquals(expectedNumBytesOut, metrics.get("write-bytes").asLong()); + Assert.assertEquals(expectedNumRecordsIn, metrics.get("read-records").asLong()); + Assert.assertEquals(expectedNumRecordsOut, metrics.get("write-records").asLong()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index 0b6038f..8d24bd0 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -17,6 +17,10 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +32,18 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + String json = SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); + Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt()); + Assert.assertEquals(originalAttempt.getAttemptId().toString(), result.get("id").asText()); + + ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(), (ArrayNode) result.get("user-accumulators")); + } }
