[FLINK-5852] Move handler JSON generation code into static methods

This closes #3365.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a552d674
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a552d674
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a552d674

Branch: refs/heads/master
Commit: a552d6746636e26c634c86d6a11732ea9d2f239e
Parents: 999bace
Author: zentol <[email protected]>
Authored: Tue Feb 28 18:01:14 2017 +0100
Committer: zentol <[email protected]>
Committed: Thu Mar 2 11:39:04 2017 +0100

----------------------------------------------------------------------
 .../api/common/ArchivedExecutionConfig.java     |  13 ++
 .../handlers/CurrentJobsOverviewHandler.java    |   9 +-
 .../handlers/DashboardConfigHandler.java        |  50 +++---
 .../handlers/JobAccumulatorsHandler.java        |   9 +-
 .../webmonitor/handlers/JobConfigHandler.java   |   4 +
 .../webmonitor/handlers/JobDetailsHandler.java  |  44 ++----
 .../handlers/JobExceptionsHandler.java          |  11 +-
 .../handlers/JobVertexAccumulatorsHandler.java  |   9 +-
 .../handlers/JobVertexDetailsHandler.java       |  50 +++---
 .../handlers/JobVertexTaskManagersHandler.java  |  55 +++----
 ...taskExecutionAttemptAccumulatorsHandler.java |   7 +-
 .../SubtaskExecutionAttemptDetailsHandler.java  |  55 +++----
 .../SubtasksAllAccumulatorsHandler.java         |   5 +
 .../handlers/SubtasksTimesHandler.java          |   5 +
 .../checkpoints/CheckpointConfigHandler.java    |   6 +-
 .../CheckpointStatsDetailsHandler.java          |   4 +-
 .../CheckpointStatsDetailsSubtasksHandler.java  |  23 ++-
 .../checkpoints/CheckpointStatsHandler.java     |  16 +-
 .../webmonitor/utils/MutableIOMetrics.java      | 106 +++++++++++++
 .../CurrentJobsOverviewHandlerTest.java         |  42 +++++
 .../handlers/DashboardConfigHandlerTest.java    |  22 +++
 .../handlers/JobAccumulatorsHandlerTest.java    |  20 +++
 .../handlers/JobConfigHandlerTest.java          |  31 ++++
 .../handlers/JobDetailsHandlerTest.java         |  95 +++++++++++
 .../handlers/JobExceptionsHandlerTest.java      |  35 ++++
 .../JobVertexAccumulatorsHandlerTest.java       |  20 +++
 .../handlers/JobVertexDetailsHandlerTest.java   |  44 ++++++
 .../JobVertexTaskManagersHandlerTest.java       |  66 ++++++++
 ...ExecutionAttemptAccumulatorsHandlerTest.java |  18 +++
 ...btaskExecutionAttemptDetailsHandlerTest.java |  29 ++++
 .../SubtasksAllAccumulatorsHandlerTest.java     |  32 ++++
 .../handlers/SubtasksTimesHandlerTest.java      |  37 +++++
 .../utils/ArchivedExecutionBuilder.java         | 146 +++++++++++++++++
 .../utils/ArchivedExecutionConfigBuilder.java   |  67 ++++++++
 .../utils/ArchivedExecutionGraphBuilder.java    | 135 ++++++++++++++++
 .../ArchivedExecutionJobVertexBuilder.java      |  80 ++++++++++
 .../utils/ArchivedExecutionVertexBuilder.java   |  69 ++++++++
 .../utils/ArchivedJobGenerationUtils.java       | 158 +++++++++++++++++++
 .../executiongraph/ArchivedExecution.java       |  15 ++
 .../ArchivedExecutionJobVertex.java             |  15 ++
 .../executiongraph/ArchivedExecutionVertex.java |   9 ++
 .../flink/runtime/executiongraph/IOMetrics.java |  36 +++--
 .../ArchivedExecutionGraphTest.java             |   6 +-
 43 files changed, 1512 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
index f267e91..700d65f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/ArchivedExecutionConfig.java
@@ -54,6 +54,19 @@ public class ArchivedExecutionConfig implements Serializable 
{
                }
        }
 
+       public ArchivedExecutionConfig(
+                       String executionMode,
+                       String restartStrategyDescription,
+                       int parallelism,
+                       boolean objectReuseEnabled,
+                       Map<String, String> globalJobParameters) {
+               this.executionMode = executionMode;
+               this.restartStrategyDescription = restartStrategyDescription;
+               this.parallelism = parallelism;
+               this.objectReuseEnabled = objectReuseEnabled;
+               this.globalJobParameters = globalJobParameters;
+       }
+
        public String getExecutionMode() {
                return executionMode;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
index 8486a9c..00cf138 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandler.java
@@ -28,6 +28,7 @@ import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -89,20 +90,20 @@ public class CurrentJobsOverviewHandler extends 
AbstractJsonRequestHandler {
                                if (includeRunningJobs && includeFinishedJobs) {
                                        gen.writeArrayFieldStart("running");
                                        for (JobDetails detail : 
result.getRunningJobs()) {
-                                               
generateSingleJobDetails(detail, gen, now);
+                                               
writeJobDetailOverviewAsJson(detail, gen, now);
                                        }
                                        gen.writeEndArray();
        
                                        gen.writeArrayFieldStart("finished");
                                        for (JobDetails detail : 
result.getFinishedJobs()) {
-                                               
generateSingleJobDetails(detail, gen, now);
+                                               
writeJobDetailOverviewAsJson(detail, gen, now);
                                        }
                                        gen.writeEndArray();
                                }
                                else {
                                        gen.writeArrayFieldStart("jobs");
                                        for (JobDetails detail : 
includeRunningJobs ? result.getRunningJobs() : result.getFinishedJobs()) {
-                                               
generateSingleJobDetails(detail, gen, now);
+                                               
writeJobDetailOverviewAsJson(detail, gen, now);
                                        }
                                        gen.writeEndArray();
                                }
@@ -120,7 +121,7 @@ public class CurrentJobsOverviewHandler extends 
AbstractJsonRequestHandler {
                }
        }
 
-       private static void generateSingleJobDetails(JobDetails details, 
JsonGenerator gen, long now) throws Exception {
+       public static void writeJobDetailOverviewAsJson(JobDetails details, 
JsonGenerator gen, long now) throws IOException {
                gen.writeStartObject();
 
                gen.writeStringField("jid", details.getJobId().toString());

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
index 49f4c26..6ad024f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandler.java
@@ -22,6 +22,7 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 import java.util.TimeZone;
@@ -38,29 +39,8 @@ public class DashboardConfigHandler extends 
AbstractJsonRequestHandler {
        private final String configString;
        
        public DashboardConfigHandler(long refreshInterval) {
-               TimeZone timeZome = TimeZone.getDefault();
-               String timeZoneName = timeZome.getDisplayName();
-               long timeZoneOffset= timeZome.getRawOffset();
-
                try {
-                       StringWriter writer = new StringWriter();
-                       JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
-       
-                       gen.writeStartObject();
-                       gen.writeNumberField("refresh-interval", 
refreshInterval);
-                       gen.writeNumberField("timezone-offset", timeZoneOffset);
-                       gen.writeStringField("timezone-name", timeZoneName);
-                       gen.writeStringField("flink-version", 
EnvironmentInformation.getVersion());
-
-                       EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
-                       if (revision != null) {
-                               gen.writeStringField("flink-revision", 
revision.commitId + " @ " + revision.commitDate);
-                       }
-
-                       gen.writeEndObject();
-       
-                       gen.close();
-                       this.configString = writer.toString();
+                       this.configString = createConfigJson(refreshInterval);
                }
                catch (Exception e) {
                        // should never happen
@@ -77,4 +57,30 @@ public class DashboardConfigHandler extends 
AbstractJsonRequestHandler {
        public String handleJsonRequest(Map<String, String> pathParams, 
Map<String, String> queryParams, ActorGateway jobManager) throws Exception {
                return this.configString;
        }
+
+       public static String createConfigJson(long refreshInterval) throws 
IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+       
+               TimeZone timeZone = TimeZone.getDefault();
+               String timeZoneName = timeZone.getDisplayName();
+               long timeZoneOffset = timeZone.getRawOffset();
+
+               gen.writeStartObject();
+               gen.writeNumberField("refresh-interval", refreshInterval);
+               gen.writeNumberField("timezone-offset", timeZoneOffset);
+               gen.writeStringField("timezone-name", timeZoneName);
+               gen.writeStringField("flink-version", 
EnvironmentInformation.getVersion());
+
+               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
+               if (revision != null) {
+                       gen.writeStringField("flink-revision", 
revision.commitId + " @ " + revision.commitDate);
+               }
+
+               gen.writeEndObject();
+
+               gen.close();
+
+               return writer.toString();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
index 7664153..dfc654e 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandler.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -44,11 +45,15 @@ public class JobAccumulatorsHandler extends 
AbstractExecutionGraphRequestHandler
 
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
-               
+               return createJobAccumulatorsJson(graph);
+       }
+
+       public static String createJobAccumulatorsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
+               StringifiedAccumulatorResult[] allAccumulators = 
graph.getAccumulatorResultsStringified();
+
                gen.writeStartObject();
 
                gen.writeArrayFieldStart("job-accumulators");

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
index 459ca2a..7d72235 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -44,7 +45,10 @@ public class JobConfigHandler extends 
AbstractExecutionGraphRequestHandler {
 
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
+               return createJobConfigJson(graph);
+       }
 
+       public static String createJobConfigJson(AccessExecutionGraph graph) 
throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
index 7780e66..6d1f82f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandler.java
@@ -25,13 +25,13 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -64,6 +64,10 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
+               return createJobDetailsJson(graph, fetcher);
+       }
+
+       public static String createJobDetailsJson(AccessExecutionGraph graph, 
@Nullable MetricFetcher fetcher) throws IOException {
                final StringWriter writer = new StringWriter();
                final JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -153,37 +157,17 @@ public class JobDetailsHandler extends 
AbstractExecutionGraphRequestHandler {
                        }
                        gen.writeEndObject();
                        
-                       long numBytesIn = 0;
-                       long numBytesOut = 0;
-                       long numRecordsIn = 0;
-                       long numRecordsOut = 0;
+                       MutableIOMetrics counts = new MutableIOMetrics();
 
                        for (AccessExecutionVertex vertex : 
ejv.getTaskVertices()) {
-                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-                               if (ioMetrics != null) { // execAttempt is 
already finished, use final metrics stored in ExecutionGraph
-                                       numBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-                                       numBytesOut += 
ioMetrics.getNumBytesOut();
-                                       numRecordsIn += 
ioMetrics.getNumRecordsIn();
-                                       numRecordsOut += 
ioMetrics.getNumRecordsOut();
-                               } else { // execAttempt is still running, use 
MetricQueryService instead
-                                       fetcher.update();
-                                       MetricStore.SubtaskMetricStore metrics 
= fetcher.getMetricStore().getSubtaskMetricStore(graph.getJobID().toString(), 
ejv.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
-                                       if (metrics != null) {
-                                               numBytesIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-                                               numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-                                               numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-                                               numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-                                       }
-                               }
+                               counts.addIOMetrics(
+                                       vertex.getCurrentExecutionAttempt(),
+                                       fetcher,
+                                       graph.getJobID().toString(),
+                                       ejv.getJobVertexId().toString());
                        }
 
-                       gen.writeObjectFieldStart("metrics");
-                       gen.writeNumberField("read-bytes", numBytesIn);
-                       gen.writeNumberField("write-bytes", numBytesOut);
-                       gen.writeNumberField("read-records", numRecordsIn);
-                       gen.writeNumberField("write-records", numRecordsOut);
-                       gen.writeEndObject();
+                       counts.writeIOMetricsAsJson(gen);
                        
                        gen.writeEndObject();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
index 3720dac..0cce61f 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandler.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.util.ExceptionUtils;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -35,7 +36,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        private static final String JOB_EXCEPTIONS_REST_PATH = 
"/jobs/:jobid/exceptions";
 
-       private static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
+       static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20;
        
        public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder) {
                super(executionGraphHolder);
@@ -48,6 +49,10 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
 
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
+               return createJobExceptionsJson(graph);
+       }
+
+       public static String createJobExceptionsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -55,7 +60,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
                
                // most important is the root failure cause
                String rootException = graph.getFailureCauseAsString();
-               if (rootException != null) {
+               if (rootException != null && 
!rootException.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
                        gen.writeStringField("root-exception", rootException);
                }
 
@@ -67,7 +72,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphRequestHandler {
                
                for (AccessExecutionVertex task : 
graph.getAllExecutionVertices()) {
                        String t = task.getFailureCauseAsString();
-                       if 
(!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
+                       if (t != null && 
!t.equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION)) {
                                if (numExceptionsSoFar >= 
MAX_NUMBER_EXCEPTION_TO_REPORT) {
                                        truncated = true;
                                        break;

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
index ccfcbba..ca0488b 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandler.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -43,11 +44,15 @@ public class JobVertexAccumulatorsHandler extends 
AbstractJobVertexRequestHandle
 
        @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
-               StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
-               
+               return createVertexAccumulatorsJson(jobVertex);
+       }
+
+       public static String 
createVertexAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
+               StringifiedAccumulatorResult[] accs = 
jobVertex.getAggregatedUserAccumulatorsStringified();
+
                gen.writeStartObject();
                gen.writeStringField("id", 
jobVertex.getJobVertexId().toString());
                

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
index 0a07896..6e7e47c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandler.java
@@ -23,13 +23,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -55,6 +55,13 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
 
        @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
+               return createVertexDetailsJson(jobVertex, params.get("jobid"), 
fetcher);
+       }
+
+       public static String createVertexDetailsJson(
+                       AccessExecutionJobVertex jobVertex,
+                       String jobID,
+                       @Nullable MetricFetcher fetcher) throws IOException {
                final long now = System.currentTimeMillis();
                
                StringWriter writer = new StringWriter();
@@ -91,35 +98,16 @@ public class JobVertexDetailsHandler extends 
AbstractJobVertexRequestHandler {
                        gen.writeNumberField("end-time", endTime);
                        gen.writeNumberField("duration", duration);
 
-                       IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-                       long numBytesIn = 0;
-                       long numBytesOut = 0;
-                       long numRecordsIn = 0;
-                       long numRecordsOut = 0;
-
-                       if (ioMetrics != null) { // execAttempt is already 
finished, use final metrics stored in ExecutionGraph
-                               numBytesIn = ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
-                               numBytesOut = ioMetrics.getNumBytesOut();
-                               numRecordsIn = ioMetrics.getNumRecordsIn();
-                               numRecordsOut = ioMetrics.getNumRecordsOut();
-                       } else { // execAttempt is still running, use 
MetricQueryService instead
-                               fetcher.update();
-                               MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
jobVertex.getJobVertexId().toString(), vertex.getParallelSubtaskIndex());
-                               if (metrics != null) {
-                                       numBytesIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-                                       numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-                                       numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-                                       numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-                               }
-                       }
+                       MutableIOMetrics counts = new MutableIOMetrics();
 
-                       gen.writeObjectFieldStart("metrics");
-                       gen.writeNumberField("read-bytes", numBytesIn);
-                       gen.writeNumberField("write-bytes", numBytesOut);
-                       gen.writeNumberField("read-records", numRecordsIn);
-                       gen.writeNumberField("write-records", numRecordsOut);
-                       gen.writeEndObject();
+                       counts.addIOMetrics(
+                               vertex.getCurrentExecutionAttempt(),
+                               fetcher,
+                               jobID,
+                               jobVertex.getJobVertexId().toString()
+                       );
+
+                       counts.writeIOMetricsAsJson(gen);
                        
                        gen.writeEndObject();
                        

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
index b3dabea..4fa54bd 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandler.java
@@ -23,19 +23,18 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * A request handler that provides the details of a job vertex, including id, 
name, and the
@@ -59,6 +58,16 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
 
        @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
+               return createVertexDetailsByTaskManagerJson(jobVertex, 
params.get("jobid"), fetcher);
+       }
+
+       public static String createVertexDetailsByTaskManagerJson(
+                       AccessExecutionJobVertex jobVertex,
+                       String jobID,
+                       @Nullable MetricFetcher fetcher) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+
                // Build a map that groups tasks by TaskManager
                Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
 
@@ -79,8 +88,6 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                // Build JSON response
                final long now = System.currentTimeMillis();
 
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
                gen.writeStartObject();
 
@@ -89,7 +96,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                gen.writeNumberField("now", now);
 
                gen.writeArrayFieldStart("taskmanagers");
-               for (Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
+               for (Map.Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
                        String host = entry.getKey();
                        List<AccessExecutionVertex> taskVertices = 
entry.getValue();
 
@@ -99,10 +106,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                        long endTime = 0;
                        boolean allFinished = true;
 
-                       long numBytesIn = 0;
-                       long numBytesOut = 0;
-                       long numRecordsIn = 0;
-                       long numRecordsOut = 0;
+                       MutableIOMetrics counts = new MutableIOMetrics();
 
                        for (AccessExecutionVertex vertex : taskVertices) {
                                final ExecutionState state = 
vertex.getExecutionState();
@@ -117,23 +121,11 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                                allFinished &= state.isTerminal();
                                endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
 
-                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
-
-                               if (ioMetrics != null) { // execAttempt is 
already finished, use final metrics stored in ExecutionGraph
-                                       numBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
-                                       numBytesOut += 
ioMetrics.getNumBytesOut();
-                                       numRecordsIn += 
ioMetrics.getNumRecordsIn();
-                                       numRecordsOut += 
ioMetrics.getNumRecordsOut();
-                               } else { // execAttempt is still running, use 
MetricQueryService instead
-                                       fetcher.update();
-                                       MetricStore.SubtaskMetricStore metrics 
= fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
params.get("vertexid"), vertex.getParallelSubtaskIndex());
-                                       if (metrics != null) {
-                                               numBytesIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-                                               numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-                                               numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-                                               numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-                                       }
-                               }
+                               counts.addIOMetrics(
+                                       vertex.getCurrentExecutionAttempt(),
+                                       fetcher,
+                                       jobID,
+                                       jobVertex.getJobVertexId().toString());
                        }
 
                        long duration;
@@ -164,12 +156,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractJobVertexRequestHandle
                        gen.writeNumberField("end-time", endTime);
                        gen.writeNumberField("duration", duration);
 
-                       gen.writeObjectFieldStart("metrics");
-                       gen.writeNumberField("read-bytes", numBytesIn);
-                       gen.writeNumberField("write-bytes", numBytesOut);
-                       gen.writeNumberField("read-records", numRecordsIn);
-                       gen.writeNumberField("write-records", numRecordsOut);
-                       gen.writeEndObject();
+                       counts.writeIOMetricsAsJson(gen);
 
                        gen.writeObjectFieldStart("status-counts");
                        for (ExecutionState state : ExecutionState.values()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
index ba3a5ee..a63016c 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandler.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -45,10 +46,14 @@ public class SubtaskExecutionAttemptAccumulatorsHandler 
extends AbstractSubtaskA
 
        @Override
        public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
-               final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
+               return createAttemptAccumulatorsJson(execAttempt);
+       }
                
+       public static String createAttemptAccumulatorsJson(AccessExecution 
execAttempt) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+               
+               final StringifiedAccumulatorResult[] accs = 
execAttempt.getUserAccumulatorsStringified();
 
                gen.writeStartObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
index b753b6e..5af6af9 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandler.java
@@ -22,13 +22,13 @@ import com.fasterxml.jackson.core.JsonGenerator;
 
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.AccessExecution;
-import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
-import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+import org.apache.flink.runtime.webmonitor.utils.MutableIOMetrics;
 
+import javax.annotation.Nullable;
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -53,6 +53,17 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
 
        @Override
        public String handleRequest(AccessExecution execAttempt, Map<String, 
String> params) throws Exception {
+               return createAttemptDetailsJson(execAttempt, 
params.get("jobid"), params.get("vertexid"), fetcher);
+       }
+
+       public static String createAttemptDetailsJson(
+                       AccessExecution execAttempt,
+                       String jobID,
+                       String vertexID,
+                       @Nullable MetricFetcher fetcher) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+
                final ExecutionState status = execAttempt.getState();
                final long now = System.currentTimeMillis();
 
@@ -66,9 +77,6 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                long endTime = status.isTerminal() ? 
execAttempt.getStateTimestamp(status) : -1;
                long duration = startTime > 0 ? ((endTime > 0 ? endTime : now) 
- startTime) : -1;
 
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
-
                gen.writeStartObject();
                gen.writeNumberField("subtask", 
execAttempt.getParallelSubtaskIndex());
                gen.writeStringField("status", status.name());
@@ -78,35 +86,16 @@ public class SubtaskExecutionAttemptDetailsHandler extends 
AbstractSubtaskAttemp
                gen.writeNumberField("end-time", endTime);
                gen.writeNumberField("duration", duration);
 
-               IOMetrics ioMetrics = execAttempt.getIOMetrics();
+               MutableIOMetrics counts = new MutableIOMetrics();
 
-               long numBytesIn = 0;
-               long numBytesOut = 0;
-               long numRecordsIn = 0;
-               long numRecordsOut = 0;
+               counts.addIOMetrics(
+                       execAttempt,
+                       fetcher,
+                       jobID,
+                       vertexID
+               );
                
-               if (ioMetrics != null) { // execAttempt is already finished, 
use final metrics stored in ExecutionGraph
-                       numBytesIn = ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
-                       numBytesOut = ioMetrics.getNumBytesOut();
-                       numRecordsIn = ioMetrics.getNumRecordsIn();
-                       numRecordsOut = ioMetrics.getNumRecordsOut();
-               } else { // execAttempt is still running, use 
MetricQueryService instead
-                       fetcher.update();
-                       MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(params.get("jobid"), 
params.get("vertexid"), execAttempt.getParallelSubtaskIndex());
-                       if (metrics != null) {
-                               numBytesIn = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0")) + 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
-                               numBytesOut = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
-                               numRecordsIn = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
-                               numRecordsOut = 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
-                       }
-               }
-               
-               gen.writeObjectFieldStart("metrics");
-               gen.writeNumberField("read-bytes", numBytesIn);
-               gen.writeNumberField("write-bytes", numBytesOut);
-               gen.writeNumberField("read-records", numRecordsIn);
-               gen.writeNumberField("write-records", numRecordsOut);
-               gen.writeEndObject();
+               counts.writeIOMetricsAsJson(gen);
 
                gen.writeEndObject();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
index 222d474..10a8773 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -47,6 +48,10 @@ public class SubtasksAllAccumulatorsHandler extends 
AbstractJobVertexRequestHand
 
        @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
+               return createSubtasksAccumulatorsJson(jobVertex);
+       }
+
+       public static String 
createSubtasksAccumulatorsJson(AccessExecutionJobVertex jobVertex) throws 
IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
index e2e35e3..08bd722 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -48,6 +49,10 @@ public class SubtasksTimesHandler extends 
AbstractJobVertexRequestHandler {
 
        @Override
        public String handleRequest(AccessExecutionJobVertex jobVertex, 
Map<String, String> params) throws Exception {
+               return createSubtaskTimesJson(jobVertex);
+       }
+
+       public static String createSubtaskTimesJson(AccessExecutionJobVertex 
jobVertex) throws IOException {
                final long now = System.currentTimeMillis();
 
                StringWriter writer = new StringWriter();

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
index de40a4a..9976298 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointConfigHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.webmonitor.ExecutionGraphHolder;
 import 
org.apache.flink.runtime.webmonitor.handlers.AbstractExecutionGraphRequestHandler;
 import org.apache.flink.runtime.webmonitor.handlers.JsonFactory;
 
+import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
@@ -47,8 +48,11 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphRequestHandle
 
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
-               StringWriter writer = new StringWriter();
+               return createCheckpointConfigJson(graph);
+       }
 
+       private static String createCheckpointConfigJson(AccessExecutionGraph 
graph) throws IOException {
+               StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
                JobSnapshottingSettings settings = 
graph.getJobSnapshottingSettings();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
index e651824..4bbb8f6 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsHandler.java
@@ -76,10 +76,10 @@ public class CheckpointStatsDetailsHandler extends 
AbstractExecutionGraphRequest
                        }
                }
 
-               return writeResponse(checkpoint);
+               return createCheckpointDetailsJson(checkpoint);
        }
 
-       private String writeResponse(AbstractCheckpointStats checkpoint) throws 
IOException {
+       public static String 
createCheckpointDetailsJson(AbstractCheckpointStats checkpoint) throws 
IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
                gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
index 15dd911..b28ecef 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsDetailsSubtasksHandler.java
@@ -36,6 +36,7 @@ import java.io.IOException;
 import java.io.StringWriter;
 import java.util.Map;
 
+import static 
org.apache.flink.runtime.webmonitor.handlers.checkpoints.CheckpointStatsHandler.writeMinMaxAvg;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -95,19 +96,19 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
                        }
                }
 
-               return writeResponse(checkpoint, vertexId);
-       }
-
-       private String writeResponse(AbstractCheckpointStats checkpoint, 
JobVertexID vertexId) throws IOException {
-               StringWriter writer = new StringWriter();
-               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
-               gen.writeStartObject();
-
                TaskStateStats taskStats = 
checkpoint.getTaskStateStats(vertexId);
                if (taskStats == null) {
                        return "{}";
                }
+               
+               return createSubtaskCheckpointDetailsJson(checkpoint, 
taskStats);
+       }
 
+       private static String 
createSubtaskCheckpointDetailsJson(AbstractCheckpointStats checkpoint, 
TaskStateStats taskStats) throws IOException {
+               StringWriter writer = new StringWriter();
+               JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
+
+               gen.writeStartObject();
                // Overview
                gen.writeNumberField("id", checkpoint.getCheckpointId());
                gen.writeStringField("status", 
checkpoint.getStatus().toString());
@@ -188,10 +189,4 @@ public class CheckpointStatsDetailsSubtasksHandler extends 
AbstractExecutionGrap
                return writer.toString();
        }
 
-       private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats 
minMaxAvg) throws IOException {
-               gen.writeNumberField("min", minMaxAvg.getMinimum());
-               gen.writeNumberField("max", minMaxAvg.getMaximum());
-               gen.writeNumberField("avg", minMaxAvg.getAverage());
-       }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
index 6413806..585ab26 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/checkpoints/CheckpointStatsHandler.java
@@ -56,6 +56,10 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
        @Override
        public String handleRequest(AccessExecutionGraph graph, Map<String, 
String> params) throws Exception {
+               return createCheckpointStatsJson(graph);
+       }
+
+       private static String createCheckpointStatsJson(AccessExecutionGraph 
graph) throws IOException {
                StringWriter writer = new StringWriter();
                JsonGenerator gen = 
JsonFactory.jacksonFactory.createGenerator(writer);
 
@@ -91,7 +95,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                return writer.toString();
        }
 
-       private void writeCounts(JsonGenerator gen, CheckpointStatsCounts 
counts) throws IOException {
+       private static void writeCounts(JsonGenerator gen, 
CheckpointStatsCounts counts) throws IOException {
                gen.writeObjectFieldStart("counts");
                gen.writeNumberField("restored", 
counts.getNumberOfRestoredCheckpoints());
                gen.writeNumberField("total", 
counts.getTotalNumberOfCheckpoints());
@@ -101,7 +105,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                gen.writeEndObject();
        }
 
-       private void writeSummary(
+       private static void writeSummary(
                JsonGenerator gen,
                CompletedCheckpointStatsSummary summary) throws IOException {
                gen.writeObjectFieldStart("summary");
@@ -119,13 +123,13 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                gen.writeEndObject();
        }
 
-       private void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats 
minMaxAvg) throws IOException {
+       static void writeMinMaxAvg(JsonGenerator gen, MinMaxAvgStats minMaxAvg) 
throws IOException {
                gen.writeNumberField("min", minMaxAvg.getMinimum());
                gen.writeNumberField("max", minMaxAvg.getMaximum());
                gen.writeNumberField("avg", minMaxAvg.getAverage());
        }
 
-       private void writeLatestCheckpoints(
+       private static void writeLatestCheckpoints(
                JsonGenerator gen,
                @Nullable CompletedCheckpointStats completed,
                @Nullable CompletedCheckpointStats savepoint,
@@ -187,7 +191,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
                gen.writeEndObject();
        }
 
-       private void writeCheckpoint(JsonGenerator gen, AbstractCheckpointStats 
checkpoint) throws IOException {
+       private static void writeCheckpoint(JsonGenerator gen, 
AbstractCheckpointStats checkpoint) throws IOException {
                gen.writeNumberField("id", checkpoint.getCheckpointId());
                gen.writeNumberField("trigger_timestamp", 
checkpoint.getTriggerTimestamp());
                gen.writeNumberField("latest_ack_timestamp", 
checkpoint.getLatestAckTimestamp());
@@ -197,7 +201,7 @@ public class CheckpointStatsHandler extends 
AbstractExecutionGraphRequestHandler
 
        }
 
-       private void writeHistory(JsonGenerator gen, CheckpointStatsHistory 
history) throws IOException {
+       private static void writeHistory(JsonGenerator gen, 
CheckpointStatsHistory history) throws IOException {
                gen.writeArrayFieldStart("history");
                for (AbstractCheckpointStats checkpoint : 
history.getCheckpoints()) {
                        gen.writeStartObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
new file mode 100644
index 0000000..32cda7f
--- /dev/null
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/utils/MutableIOMetrics.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.metrics.MetricNames;
+import org.apache.flink.runtime.webmonitor.handlers.JobVertexDetailsHandler;
+import org.apache.flink.runtime.webmonitor.metrics.MetricFetcher;
+import org.apache.flink.runtime.webmonitor.metrics.MetricStore;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+
+/**
+ * This class is a mutable version of the {@link IOMetrics} class that allows 
adding up IO-related metrics.
+ * 
+ * For finished jobs these metrics are stored in the {@link ExecutionGraph} as 
another {@link IOMetrics}.
+ * For running jobs these metrics are retrieved using the {@link 
MetricFetcher}.
+ * 
+ * This class provides a common interface to handle both cases, reducing 
complexity in various handlers (like
+ * the {@link JobVertexDetailsHandler}).
+ */
+public class MutableIOMetrics extends IOMetrics {
+
+       private static final long serialVersionUID = -5460777634971381737L;
+
+       public MutableIOMetrics() {
+               super(0, 0, 0, 0, 0, 0.0D, 0.0D, 0.0D, 0.0D, 0.0D);
+       }
+
+       /**
+        * Adds the IO metrics for the given attempt to this object. If the 
{@link AccessExecution} is in
+        * a terminal state the contained {@link IOMetrics} object is added. 
Otherwise the given {@link MetricFetcher} is
+        * used to retrieve the required metrics.
+        * 
+        * @param attempt Attempt whose IO metrics should be added
+        * @param fetcher MetricFetcher to retrieve metrics for running jobs
+        * @param jobID JobID to which the attempt belongs
+        * @param taskID TaskID to which the attempt belongs
+        */
+       public void addIOMetrics(AccessExecution attempt, @Nullable 
MetricFetcher fetcher, String jobID, String taskID) {
+               if (attempt.getState().isTerminal()) {
+                       IOMetrics ioMetrics = attempt.getIOMetrics();
+                       if (ioMetrics != null) { // execAttempt is already 
finished, use final metrics stored in ExecutionGraph
+                               this.numBytesInLocal += 
ioMetrics.getNumBytesInLocal();
+                               this.numBytesInRemote += 
ioMetrics.getNumBytesInRemote();
+                               this.numBytesOut += ioMetrics.getNumBytesOut();
+                               this.numRecordsIn += 
ioMetrics.getNumRecordsIn();
+                               this.numRecordsOut += 
ioMetrics.getNumRecordsOut();
+                       }
+               } else { // execAttempt is still running, use 
MetricQueryService instead
+                       if (fetcher != null) {
+                               fetcher.update();
+                               MetricStore.SubtaskMetricStore metrics = 
fetcher.getMetricStore().getSubtaskMetricStore(jobID, taskID, 
attempt.getParallelSubtaskIndex());
+                               if (metrics != null) {
+                                       this.numBytesInLocal += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_LOCAL, "0"));
+                                       this.numBytesInRemote += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_IN_REMOTE, "0"));
+                                       this.numBytesOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_BYTES_OUT, "0"));
+                                       this.numRecordsIn += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_IN, "0"));
+                                       this.numRecordsOut += 
Long.valueOf(metrics.getMetric(MetricNames.IO_NUM_RECORDS_OUT, "0"));
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Writes the IO metrics contained in this object to the given {@link 
JsonGenerator}.
+        * 
+        * The JSON structure written is as follows:
+        * "metrics": {
+        *     "read-bytes": 1,
+        *     "write-bytes": 2,
+        *     "read-records": 3,
+        *     "write-records": 4
+        * }
+        * 
+        * @param gen JsonGenerator to which the metrics should be written
+        * @throws IOException
+        */
+       public void writeIOMetricsAsJson(JsonGenerator gen) throws IOException {
+               gen.writeObjectFieldStart("metrics");
+               gen.writeNumberField("read-bytes",this.numBytesInLocal + 
this.numBytesInRemote);
+               gen.writeNumberField("write-bytes", this.numBytesOut);
+               gen.writeNumberField("read-records", this.numRecordsIn);
+               gen.writeNumberField("write-records", this.numRecordsOut);
+               gen.writeEndObject();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
index 3207fec..caf6d8e 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/CurrentJobsOverviewHandlerTest.java
@@ -17,10 +17,18 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.webmonitor.JobDetails;
+import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 import scala.concurrent.duration.FiniteDuration;
 
+import java.io.StringWriter;
 import java.util.concurrent.TimeUnit;
 
 public class CurrentJobsOverviewHandlerTest {
@@ -41,4 +49,38 @@ public class CurrentJobsOverviewHandlerTest {
                Assert.assertEquals(1, pathsCompleted.length);
                Assert.assertEquals("/joboverview/completed", 
pathsCompleted[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               JobDetails expectedDetails = 
WebMonitorUtils.createDetailsForJob(originalJob);
+               StringWriter writer = new StringWriter();
+               try (JsonGenerator gen = 
ArchivedJobGenerationUtils.jacksonFactory.createGenerator(writer)) {
+                       
CurrentJobsOverviewHandler.writeJobDetailOverviewAsJson(expectedDetails, gen, 
0);
+               }
+               String answer = writer.toString();
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(answer);
+
+               Assert.assertEquals(expectedDetails.getJobId().toString(), 
result.get("jid").asText());
+               Assert.assertEquals(expectedDetails.getJobName(), 
result.get("name").asText());
+               Assert.assertEquals(expectedDetails.getStatus().name(), 
result.get("state").asText());
+
+               Assert.assertEquals(expectedDetails.getStartTime(), 
result.get("start-time").asLong());
+               Assert.assertEquals(expectedDetails.getEndTime(), 
result.get("end-time").asLong());
+               Assert.assertEquals(expectedDetails.getEndTime() - 
expectedDetails.getStartTime(), result.get("duration").asLong());
+               Assert.assertEquals(expectedDetails.getLastUpdateTime(), 
result.get("last-modification").asLong());
+
+               JsonNode tasks = result.get("tasks");
+               Assert.assertEquals(expectedDetails.getNumTasks(), 
tasks.get("total").asInt());
+               int[] tasksPerState = 
expectedDetails.getNumVerticesPerExecutionState();
+               Assert.assertEquals(
+                       tasksPerState[ExecutionState.CREATED.ordinal()] + 
tasksPerState[ExecutionState.SCHEDULED.ordinal()] + 
tasksPerState[ExecutionState.DEPLOYING.ordinal()],
+                       tasks.get("pending").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.RUNNING.ordinal()], 
tasks.get("running").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.FINISHED.ordinal()], 
tasks.get("finished").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.CANCELING.ordinal()], 
tasks.get("canceling").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.CANCELED.ordinal()], 
tasks.get("canceled").asInt());
+               
Assert.assertEquals(tasksPerState[ExecutionState.FAILED.ordinal()], 
tasks.get("failed").asInt());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
index aa2d552..9784a06 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/DashboardConfigHandlerTest.java
@@ -17,9 +17,14 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.TimeZone;
+
 public class DashboardConfigHandlerTest {
        @Test
        public void testGetPaths() {
@@ -28,4 +33,21 @@ public class DashboardConfigHandlerTest {
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/config", paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               long refreshInterval = 12345;
+               TimeZone timeZone = TimeZone.getDefault();
+               EnvironmentInformation.RevisionInformation revision = 
EnvironmentInformation.getRevisionInformation();
+
+               String json = 
DashboardConfigHandler.createConfigJson(refreshInterval);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(refreshInterval, 
result.get("refresh-interval").asLong());
+               Assert.assertEquals(timeZone.getDisplayName(), 
result.get("timezone-name").asText());
+               Assert.assertEquals(timeZone.getRawOffset(), 
result.get("timezone-offset").asLong());
+               Assert.assertEquals(EnvironmentInformation.getVersion(), 
result.get("flink-version").asText());
+               Assert.assertEquals(revision.commitId + " @ " + 
revision.commitDate, result.get("flink-revision").asText());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
index 96c7dd5..34748b7 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobAccumulatorsHandlerTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +32,20 @@ public class JobAccumulatorsHandlerTest {
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String json = 
JobAccumulatorsHandler.createJobAccumulatorsJson(originalJob);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               ArrayNode accs = (ArrayNode) result.get("job-accumulators");
+               Assert.assertEquals(0, accs.size());
+
+               
Assert.assertTrue(originalJob.getAccumulatorResultsStringified().length > 0);
+               ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+                       originalJob.getAccumulatorResultsStringified(),
+                       (ArrayNode) result.get("user-task-accumulators"));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
index 47ea6bf..f304efe 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobConfigHandlerTest.java
@@ -17,9 +17,15 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Map;
+
 public class JobConfigHandlerTest {
        @Test
        public void testGetPaths() {
@@ -28,4 +34,29 @@ public class JobConfigHandlerTest {
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/config", paths[0]);
        }
+
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String answer = 
JobConfigHandler.createJobConfigJson(originalJob);
+
+               JsonNode job = 
ArchivedJobGenerationUtils.mapper.readTree(answer);
+
+               Assert.assertEquals(originalJob.getJobID().toString(), 
job.get("jid").asText());
+               Assert.assertEquals(originalJob.getJobName(), 
job.get("name").asText());
+
+               ArchivedExecutionConfig originalConfig = 
originalJob.getArchivedExecutionConfig();
+               JsonNode config = job.get("execution-config");
+
+               Assert.assertEquals(originalConfig.getExecutionMode(), 
config.get("execution-mode").asText());
+               
Assert.assertEquals(originalConfig.getRestartStrategyDescription(), 
config.get("restart-strategy").asText());
+               Assert.assertEquals(originalConfig.getParallelism(), 
config.get("job-parallelism").asInt());
+               Assert.assertEquals(originalConfig.getObjectReuseEnabled(), 
config.get("object-reuse-mode").asBoolean());
+
+               Map<String, String> originalUserConfig = 
originalConfig.getGlobalJobParameters();
+               JsonNode userConfig = config.get("user-config");
+
+               for (Map.Entry<String, String> originalEntry : 
originalUserConfig.entrySet()) {
+                       Assert.assertEquals(originalEntry.getValue(), 
userConfig.get(originalEntry.getKey()).asText());
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
index b56bd64..3f80d12 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobDetailsHandlerTest.java
@@ -17,7 +17,16 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.google.common.collect.Lists;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -33,4 +42,90 @@ public class JobDetailsHandlerTest {
                Assert.assertTrue(pathsList.contains("/jobs/:jobid"));
                Assert.assertTrue(pathsList.contains("/jobs/:jobid/vertices"));
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String json = 
JobDetailsHandler.createJobDetailsJson(originalJob, null);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalJob.getJobID().toString(), 
result.get("jid").asText());
+               Assert.assertEquals(originalJob.getJobName(), 
result.get("name").asText());
+               Assert.assertEquals(originalJob.isStoppable(), 
result.get("isStoppable").asBoolean());
+               Assert.assertEquals(originalJob.getState().name(), 
result.get("state").asText());
+
+               
Assert.assertEquals(originalJob.getStatusTimestamp(JobStatus.CREATED), 
result.get("start-time").asLong());
+               
Assert.assertEquals(originalJob.getStatusTimestamp(originalJob.getState()), 
result.get("end-time").asLong());
+               Assert.assertEquals(
+                       originalJob.getStatusTimestamp(originalJob.getState()) 
- originalJob.getStatusTimestamp(JobStatus.CREATED),
+                       result.get("duration").asLong()
+               );
+
+               JsonNode timestamps = result.get("timestamps");
+               for (JobStatus status : JobStatus.values()) {
+                       
Assert.assertEquals(originalJob.getStatusTimestamp(status), 
timestamps.get(status.name()).asLong());
+               }
+
+               ArrayNode tasks = (ArrayNode) result.get("vertices");
+               int x = 0;
+               for (AccessExecutionJobVertex expectedTask : 
originalJob.getVerticesTopologically()) {
+                       JsonNode task = tasks.get(x);
+
+                       
Assert.assertEquals(expectedTask.getJobVertexId().toString(), 
task.get("id").asText());
+                       Assert.assertEquals(expectedTask.getName(), 
task.get("name").asText());
+                       Assert.assertEquals(expectedTask.getParallelism(), 
task.get("parallelism").asInt());
+                       
Assert.assertEquals(expectedTask.getAggregateState().name(), 
task.get("status").asText());
+
+                       Assert.assertEquals(3, task.get("start-time").asLong());
+                       Assert.assertEquals(5, task.get("end-time").asLong());
+                       Assert.assertEquals(2, task.get("duration").asLong());
+
+                       JsonNode subtasksPerState = task.get("tasks");
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CREATED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.SCHEDULED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.DEPLOYING.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.RUNNING.name()).asInt());
+                       Assert.assertEquals(1, 
subtasksPerState.get(ExecutionState.FINISHED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CANCELING.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.CANCELED.name()).asInt());
+                       Assert.assertEquals(0, 
subtasksPerState.get(ExecutionState.FAILED.name()).asInt());
+
+                       long expectedNumBytesIn = 0;
+                       long expectedNumBytesOut = 0;
+                       long expectedNumRecordsIn = 0;
+                       long expectedNumRecordsOut = 0;
+
+                       for (AccessExecutionVertex vertex : 
expectedTask.getTaskVertices()) {
+                               IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                               expectedNumBytesIn += 
ioMetrics.getNumBytesInLocal() + ioMetrics.getNumBytesInRemote();
+                               expectedNumBytesOut += 
ioMetrics.getNumBytesOut();
+                               expectedNumRecordsIn += 
ioMetrics.getNumRecordsIn();
+                               expectedNumRecordsOut += 
ioMetrics.getNumRecordsOut();
+                       }
+
+                       JsonNode metrics = task.get("metrics");
+
+                       Assert.assertEquals(expectedNumBytesIn, 
metrics.get("read-bytes").asLong());
+                       Assert.assertEquals(expectedNumBytesOut, 
metrics.get("write-bytes").asLong());
+                       Assert.assertEquals(expectedNumRecordsIn, 
metrics.get("read-records").asLong());
+                       Assert.assertEquals(expectedNumRecordsOut, 
metrics.get("write-records").asLong());
+
+                       x++;
+               }
+               Assert.assertEquals(1, tasks.size());
+
+               JsonNode statusCounts = result.get("status-counts");
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CREATED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+               Assert.assertEquals(1, 
statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+               
Assert.assertEquals(ArchivedJobGenerationUtils.mapper.readTree(originalJob.getJsonPlan()),
 result.get("plan"));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
index 850971a..c86ce6a 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobExceptionsHandlerTest.java
@@ -17,6 +17,13 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
+import org.apache.flink.util.ExceptionUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +35,32 @@ public class JobExceptionsHandlerTest {
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               String json = 
JobExceptionsHandler.createJobExceptionsJson(originalJob);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalJob.getFailureCauseAsString(), 
result.get("root-exception").asText());
+
+               ArrayNode exceptions = (ArrayNode) result.get("all-exceptions");
+
+               int x = 0;
+               for (AccessExecutionVertex expectedSubtask : 
originalJob.getAllExecutionVertices()) {
+                       if 
(!expectedSubtask.getFailureCauseAsString().equals(ExceptionUtils.STRINGIFIED_NULL_EXCEPTION))
 {
+                               JsonNode exception = exceptions.get(x);
+
+                               
Assert.assertEquals(expectedSubtask.getFailureCauseAsString(), 
exception.get("exception").asText());
+                               
Assert.assertEquals(expectedSubtask.getTaskNameWithSubtaskIndex(), 
exception.get("task").asText());
+
+                               TaskManagerLocation location = 
expectedSubtask.getCurrentAssignedResourceLocation();
+                               String expectedLocationString = 
location.getFQDNHostname() + ':' + location.dataPort();
+                               Assert.assertEquals(expectedLocationString, 
exception.get("location").asText());
+                       }
+                       x++;
+               }
+               Assert.assertEquals(x > 
JobExceptionsHandler.MAX_NUMBER_EXCEPTION_TO_REPORT, 
result.get("truncated").asBoolean());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
index d513836..03c1896 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexAccumulatorsHandlerTest.java
@@ -17,6 +17,11 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +33,19 @@ public class JobVertexAccumulatorsHandlerTest {
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               String json = 
JobVertexAccumulatorsHandler.createVertexAccumulatorsJson(originalTask);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+
+               ArrayNode accs = (ArrayNode) result.get("user-accumulators");
+               StringifiedAccumulatorResult[] expectedAccs = 
originalTask.getAggregatedUserAccumulatorsStringified();
+
+               
ArchivedJobGenerationUtils.compareStringifiedAccumulators(expectedAccs, accs);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
index d20d736..e909c8c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexDetailsHandlerTest.java
@@ -17,6 +17,13 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +35,41 @@ public class JobVertexDetailsHandlerTest {
                Assert.assertEquals(1, paths.length);
                Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", 
paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               String json = JobVertexDetailsHandler.createVertexDetailsJson(
+                       originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
+               Assert.assertEquals(originalTask.getParallelism(), 
result.get("parallelism").asInt());
+               Assert.assertTrue(result.get("now").asLong() > 0);
+
+               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+               Assert.assertEquals(originalTask.getTaskVertices().length, 
subtasks.size());
+               for (int x = 0; x < originalTask.getTaskVertices().length; x++) 
{
+                       AccessExecutionVertex expectedSubtask = 
originalTask.getTaskVertices()[x];
+                       JsonNode subtask = subtasks.get(x);
+
+                       Assert.assertEquals(x, subtask.get("subtask").asInt());
+                       
Assert.assertEquals(expectedSubtask.getExecutionState().name(), 
subtask.get("status").asText());
+                       
Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(),
 subtask.get("attempt").asInt());
+
+                       TaskManagerLocation location = 
expectedSubtask.getCurrentAssignedResourceLocation();
+                       String expectedLocationString = location.getHostname() 
+ ":" + location.dataPort();
+                       Assert.assertEquals(expectedLocationString, 
subtask.get("host").asText());
+                       long start = 
expectedSubtask.getStateTimestamp(ExecutionState.DEPLOYING);
+                       Assert.assertEquals(start, 
subtask.get("start-time").asLong());
+                       long end = 
expectedSubtask.getStateTimestamp(ExecutionState.FINISHED);
+                       Assert.assertEquals(end, 
subtask.get("end-time").asLong());
+                       Assert.assertEquals(end - start, 
subtask.get("duration").asLong());
+
+                       
ArchivedJobGenerationUtils.compareIoMetrics(expectedSubtask.getCurrentExecutionAttempt().getIOMetrics(),
 subtask.get("metrics"));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
index e56a517..11e35e5 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JobVertexTaskManagersHandlerTest.java
@@ -17,6 +17,14 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +36,62 @@ public class JobVertexTaskManagersHandlerTest {
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecutionVertex originalSubtask = 
ArchivedJobGenerationUtils.getTestSubtask();
+               String json = 
JobVertexTaskManagersHandler.createVertexDetailsByTaskManagerJson(
+                       originalTask, 
ArchivedJobGenerationUtils.getTestJob().getJobID().toString(), null);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
+               Assert.assertTrue(result.get("now").asLong() > 0);
+
+               ArrayNode taskmanagers = (ArrayNode) result.get("taskmanagers");
+
+               JsonNode taskManager = taskmanagers.get(0);
+
+               TaskManagerLocation location = 
originalSubtask.getCurrentAssignedResourceLocation();
+               String expectedLocationString = location.getHostname() + ':' + 
location.dataPort();
+               Assert.assertEquals(expectedLocationString, 
taskManager.get("host").asText());
+               Assert.assertEquals(ExecutionState.FINISHED.name(), 
taskManager.get("status").asText());
+
+               Assert.assertEquals(3, taskManager.get("start-time").asLong());
+               Assert.assertEquals(5, taskManager.get("end-time").asLong());
+               Assert.assertEquals(2, taskManager.get("duration").asLong());
+
+               JsonNode statusCounts = taskManager.get("status-counts");
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CREATED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.SCHEDULED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.DEPLOYING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.RUNNING.name()).asInt());
+               Assert.assertEquals(1, 
statusCounts.get(ExecutionState.FINISHED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELING.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.CANCELED.name()).asInt());
+               Assert.assertEquals(0, 
statusCounts.get(ExecutionState.FAILED.name()).asInt());
+
+               long expectedNumBytesIn = 0;
+               long expectedNumBytesOut = 0;
+               long expectedNumRecordsIn = 0;
+               long expectedNumRecordsOut = 0;
+
+               for (AccessExecutionVertex vertex : 
originalTask.getTaskVertices()) {
+                       IOMetrics ioMetrics = 
vertex.getCurrentExecutionAttempt().getIOMetrics();
+
+                       expectedNumBytesIn += ioMetrics.getNumBytesInLocal() + 
ioMetrics.getNumBytesInRemote();
+                       expectedNumBytesOut += ioMetrics.getNumBytesOut();
+                       expectedNumRecordsIn += ioMetrics.getNumRecordsIn();
+                       expectedNumRecordsOut += ioMetrics.getNumRecordsOut();
+               }
+
+               JsonNode metrics = taskManager.get("metrics");
+
+               Assert.assertEquals(expectedNumBytesIn, 
metrics.get("read-bytes").asLong());
+               Assert.assertEquals(expectedNumBytesOut, 
metrics.get("write-bytes").asLong());
+               Assert.assertEquals(expectedNumRecordsIn, 
metrics.get("read-records").asLong());
+               Assert.assertEquals(expectedNumRecordsOut, 
metrics.get("write-records").asLong());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
index 0b6038f..8d24bd0 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptAccumulatorsHandlerTest.java
@@ -17,6 +17,10 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +32,18 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest {
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators",
 paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+               String json = 
SubtaskExecutionAttemptAccumulatorsHandler.createAttemptAccumulatorsJson(originalAttempt);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());
+               Assert.assertEquals(originalAttempt.getAttemptNumber(), 
result.get("attempt").asInt());
+               Assert.assertEquals(originalAttempt.getAttemptId().toString(), 
result.get("id").asText());
+
+               
ArchivedJobGenerationUtils.compareStringifiedAccumulators(originalAttempt.getUserAccumulatorsStringified(),
 (ArrayNode) result.get("user-accumulators"));
+       }
 }

Reply via email to