Repository: flink
Updated Branches:
  refs/heads/master 51b7ede28 -> a552d6746


http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
index e9c9f84..54f3f9c 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +34,27 @@ public class SubtaskExecutionAttemptDetailsHandlerTest {
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt",
 paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionGraph originalJob = 
ArchivedJobGenerationUtils.getTestJob();
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+               String json = 
SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson(
+                       originalAttempt, originalJob.getJobID().toString(), 
originalTask.getJobVertexId().toString(), null);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), 
result.get("subtask").asInt());
+               Assert.assertEquals(originalAttempt.getState().name(), 
result.get("status").asText());
+               Assert.assertEquals(originalAttempt.getAttemptNumber(), 
result.get("attempt").asInt());
+               
Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(),
 result.get("host").asText());
+               long start = 
originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING);
+               Assert.assertEquals(start, result.get("start-time").asLong());
+               long end = 
originalAttempt.getStateTimestamp(ExecutionState.FINISHED);
+               Assert.assertEquals(end, result.get("end-time").asLong());
+               Assert.assertEquals(end - start, 
result.get("duration").asLong());
+
+               
ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), 
result.get("metrics"));
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
index 1efb260..954ebad 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java
@@ -17,6 +17,11 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +33,31 @@ public class SubtasksAllAccumulatorsHandlerTest {
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", 
paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               String json = 
SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getParallelism(), 
result.get("parallelism").asInt());
+
+               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+               Assert.assertEquals(originalTask.getTaskVertices().length, 
subtasks.size());
+               for (int x = 0; x < originalTask.getTaskVertices().length; x++) 
{
+                       JsonNode subtask = subtasks.get(x);
+                       AccessExecutionVertex expectedSubtask = 
originalTask.getTaskVertices()[x];
+
+                       Assert.assertEquals(x, subtask.get("subtask").asInt());
+                       
Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(),
 subtask.get("attempt").asInt());
+                       
Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(),
 subtask.get("host").asText());
+
+                       
ArchivedJobGenerationUtils.compareStringifiedAccumulators(
+                               
expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(),
+                               (ArrayNode) subtask.get("user-accumulators"));
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
index 1c8d2bd..939f439 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java
@@ -17,6 +17,12 @@
  */
 package org.apache.flink.runtime.webmonitor.handlers;
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -28,4 +34,35 @@ public class SubtasksTimesHandlerTest {
                Assert.assertEquals(1, paths.length);
                
Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]);
        }
+
+       @Test
+       public void testJsonGeneration() throws Exception {
+               AccessExecutionJobVertex originalTask = 
ArchivedJobGenerationUtils.getTestTask();
+               AccessExecution originalAttempt = 
ArchivedJobGenerationUtils.getTestAttempt();
+               String json = 
SubtasksTimesHandler.createSubtaskTimesJson(originalTask);
+
+               JsonNode result = 
ArchivedJobGenerationUtils.mapper.readTree(json);
+
+               Assert.assertEquals(originalTask.getJobVertexId().toString(), 
result.get("id").asText());
+               Assert.assertEquals(originalTask.getName(), 
result.get("name").asText());
+               Assert.assertTrue(result.get("now").asLong() > 0L);
+
+               ArrayNode subtasks = (ArrayNode) result.get("subtasks");
+
+               JsonNode subtask = subtasks.get(0);
+               Assert.assertEquals(0, subtask.get("subtask").asInt());
+               
Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(),
 subtask.get("host").asText());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState())
 - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), 
subtask.get("duration").asLong());
+
+               JsonNode timestamps = subtask.get("timestamps");
+
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), 
timestamps.get(ExecutionState.CREATED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED),
 timestamps.get(ExecutionState.SCHEDULED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING),
 timestamps.get(ExecutionState.DEPLOYING.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), 
timestamps.get(ExecutionState.RUNNING.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), 
timestamps.get(ExecutionState.FINISHED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING),
 timestamps.get(ExecutionState.CANCELING.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), 
timestamps.get(ExecutionState.CANCELED.name()).asLong());
+               
Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), 
timestamps.get(ExecutionState.FAILED.name()).asLong());
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
new file mode 100644
index 0000000..98fc92d
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.MeterView;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.Preconditions;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+public class ArchivedExecutionBuilder {
+       private ExecutionAttemptID attemptId;
+       private long[] stateTimestamps;
+       private int attemptNumber;
+       private ExecutionState state;
+       private String failureCause;
+       private TaskManagerLocation assignedResourceLocation;
+       private StringifiedAccumulatorResult[] userAccumulators;
+       private IOMetrics ioMetrics;
+       private int parallelSubtaskIndex;
+
+       public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID 
attemptId) {
+               this.attemptId = attemptId;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder setStateTimestamps(long[] 
stateTimestamps) {
+               Preconditions.checkArgument(stateTimestamps.length == 
ExecutionState.values().length);
+               this.stateTimestamps = stateTimestamps;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) {
+               this.attemptNumber = attemptNumber;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder setState(ExecutionState state) {
+               this.state = state;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder setFailureCause(String failureCause) {
+               this.failureCause = failureCause;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder 
setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) {
+               this.assignedResourceLocation = assignedResourceLocation;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder 
setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) {
+               this.userAccumulators = userAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder setParallelSubtaskIndex(int 
parallelSubtaskIndex) {
+               this.parallelSubtaskIndex = parallelSubtaskIndex;
+               return this;
+       }
+
+       public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) {
+               this.ioMetrics = ioMetrics;
+               return this;
+       }
+
+       public ArchivedExecution build() throws UnknownHostException {
+               return new ArchivedExecution(
+                       userAccumulators != null ? userAccumulators : new 
StringifiedAccumulatorResult[0],
+                       ioMetrics != null ? ioMetrics : new TestIOMetrics(),
+                       attemptId != null ? attemptId : new 
ExecutionAttemptID(),
+                       attemptNumber,
+                       state != null ? state : ExecutionState.FINISHED,
+                       failureCause != null ? failureCause : "(null)",
+                       assignedResourceLocation != null ? 
assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), 
InetAddress.getLocalHost(), 1234),
+                       parallelSubtaskIndex,
+                       stateTimestamps != null ? stateTimestamps : new 
long[]{1, 2, 3, 4, 5, 5, 5, 5}
+               );
+       }
+
+       private static class TestIOMetrics extends IOMetrics {
+               private static final long serialVersionUID = 
-5920076211680012555L;
+
+               public TestIOMetrics() {
+                       super(
+                               new MeterView(new TestCounter(1), 0),
+                               new MeterView(new TestCounter(2), 0),
+                               new MeterView(new TestCounter(3), 0),
+                               new MeterView(new TestCounter(4), 0),
+                               new MeterView(new TestCounter(5), 0));
+               }
+       }
+
+       private static class TestCounter implements Counter {
+               private final long count;
+
+               private TestCounter(long count) {
+                       this.count = count;
+               }
+
+               @Override
+               public void inc() {
+               }
+
+               @Override
+               public void inc(long n) {
+               }
+
+               @Override
+               public void dec() {
+               }
+
+               @Override
+               public void dec(long n) {
+               }
+
+               @Override
+               public long getCount() {
+                       return count;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
new file mode 100644
index 0000000..0880133
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class ArchivedExecutionConfigBuilder {
+       private String executionMode;
+       private String restartStrategyDescription;
+       private int parallelism;
+       private boolean objectReuseEnabled;
+       private Map<String, String> globalJobParameters;
+
+       public ArchivedExecutionConfigBuilder setExecutionMode(String 
executionMode) {
+               this.executionMode = executionMode;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder 
setRestartStrategyDescription(String restartStrategyDescription) {
+               this.restartStrategyDescription = restartStrategyDescription;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean 
objectReuseEnabled) {
+               this.objectReuseEnabled = objectReuseEnabled;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder 
setGlobalJobParameters(Map<String, String> globalJobParameters) {
+               this.globalJobParameters = globalJobParameters;
+               return this;
+       }
+
+       public ArchivedExecutionConfig build() {
+               return new ArchivedExecutionConfig(
+                       executionMode != null ? executionMode : 
ExecutionMode.PIPELINED.name(),
+                       restartStrategyDescription != null ? 
restartStrategyDescription : "default",
+                       parallelism,
+                       objectReuseEnabled,
+                       globalJobParameters != null ? globalJobParameters : 
Collections.<String, String>emptyMap()
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
new file mode 100644
index 0000000..1514a5a
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+public class ArchivedExecutionGraphBuilder {
+
+       private static final Random RANDOM = new Random();
+
+       private JobID jobID;
+       private String jobName;
+       private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+       private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+       private long[] stateTimestamps;
+       private JobStatus state;
+       private String failureCause;
+       private String jsonPlan;
+       private StringifiedAccumulatorResult[] archivedUserAccumulators;
+       private ArchivedExecutionConfig archivedExecutionConfig;
+       private boolean isStoppable;
+       private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+
+       public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
+               this.jobID = jobID;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setJobName(String jobName) {
+               this.jobName = jobName;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, 
ArchivedExecutionJobVertex> tasks) {
+               this.tasks = tasks;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> 
verticesInCreationOrder) {
+               this.verticesInCreationOrder = verticesInCreationOrder;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setStateTimestamps(long[] 
stateTimestamps) {
+               Preconditions.checkArgument(stateTimestamps.length == 
JobStatus.values().length);
+               this.stateTimestamps = stateTimestamps;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setState(JobStatus state) {
+               this.state = state;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setFailureCause(String 
failureCause) {
+               this.failureCause = failureCause;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
+               this.jsonPlan = jsonPlan;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setArchivedUserAccumulators(StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
+               this.archivedUserAccumulators = archivedUserAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) {
+               this.archivedExecutionConfig = archivedExecutionConfig;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
+               isStoppable = stoppable;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setSerializedUserAccumulators(Map<String, SerializedValue<Object>> 
serializedUserAccumulators) {
+               this.serializedUserAccumulators = serializedUserAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionGraph build() {
+               Preconditions.checkNotNull(tasks, "Tasks must not be null.");
+               JobID jobID = this.jobID != null ? this.jobID : new JobID();
+               String jobName = this.jobName != null ? this.jobName : "job_" + 
RANDOM.nextInt();
+               return new ArchivedExecutionGraph(
+                       jobID,
+                       jobName,
+                       tasks,
+                       verticesInCreationOrder != null ? 
verticesInCreationOrder : new ArrayList<>(tasks.values()),
+                       stateTimestamps != null ? stateTimestamps : new 
long[JobStatus.values().length],
+                       state != null ? state : JobStatus.FINISHED,
+                       failureCause != null ? failureCause : "(null)",
+                       jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + 
"\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
+                       archivedUserAccumulators != null ? 
archivedUserAccumulators : new StringifiedAccumulatorResult[0],
+                       serializedUserAccumulators != null ? 
serializedUserAccumulators : Collections.<String, 
SerializedValue<Object>>emptyMap(),
+                       archivedExecutionConfig != null ? 
archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
+                       isStoppable,
+                       null,
+                       null
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
new file mode 100644
index 0000000..8a45d35
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Random;
+
+public class ArchivedExecutionJobVertexBuilder {
+
+       private static final Random RANDOM = new Random();
+
+       private ArchivedExecutionVertex[] taskVertices;
+       private JobVertexID id;
+       private String name;
+       private int parallelism;
+       private int maxParallelism;
+       private StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+       public ArchivedExecutionJobVertexBuilder 
setTaskVertices(ArchivedExecutionVertex[] taskVertices) {
+               this.taskVertices = taskVertices;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
+               this.id = id;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setName(String name) {
+               this.name = name;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setParallelism(int 
parallelism) {
+               this.parallelism = parallelism;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setMaxParallelism(int 
maxParallelism) {
+               this.maxParallelism = maxParallelism;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder 
setArchivedUserAccumulators(StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
+               this.archivedUserAccumulators = archivedUserAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertex build() {
+               Preconditions.checkNotNull(taskVertices);
+               return new ArchivedExecutionJobVertex(
+                       taskVertices,
+                       id != null ? id : new JobVertexID(),
+                       name != null ? name : "task_" + RANDOM.nextInt(),
+                       parallelism,
+                       maxParallelism,
+                       archivedUserAccumulators != null ? 
archivedUserAccumulators : new StringifiedAccumulatorResult[0]
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
new file mode 100644
index 0000000..3707374
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Random;
+
+public class ArchivedExecutionVertexBuilder {
+
+       private static final Random RANDOM = new Random();
+
+       private int subtaskIndex;
+       private EvictingBoundedList<ArchivedExecution> priorExecutions;
+       private String taskNameWithSubtask;
+       private ArchivedExecution currentExecution;
+
+       public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) 
{
+               this.subtaskIndex = subtaskIndex;
+               return this;
+       }
+
+       public ArchivedExecutionVertexBuilder 
setPriorExecutions(List<ArchivedExecution> priorExecutions) {
+               this.priorExecutions = new 
EvictingBoundedList<>(priorExecutions.size());
+               for (ArchivedExecution execution : priorExecutions) {
+                       this.priorExecutions.add(execution);
+               }
+               return this;
+       }
+
+       public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String 
taskNameWithSubtask) {
+               this.taskNameWithSubtask = taskNameWithSubtask;
+               return this;
+       }
+
+       public ArchivedExecutionVertexBuilder 
setCurrentExecution(ArchivedExecution currentExecution) {
+               this.currentExecution = currentExecution;
+               return this;
+       }
+
+       public ArchivedExecutionVertex build() {
+               Preconditions.checkNotNull(currentExecution);
+               return new ArchivedExecutionVertex(
+                       subtaskIndex,
+                       taskNameWithSubtask != null ? taskNameWithSubtask : 
"task_" + RANDOM.nextInt() + "_" + subtaskIndex,
+                       currentExecution,
+                       priorExecutions != null ? priorExecutions : new 
EvictingBoundedList<ArchivedExecution>(0)
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
new file mode 100644
index 0000000..0340d87
--- /dev/null
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.webmonitor.utils;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class ArchivedJobGenerationUtils {
+       public static final ObjectMapper mapper = new ObjectMapper();
+       public static final JsonFactory jacksonFactory = new JsonFactory()
+               .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
+               .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
+
+       private static ArchivedExecutionGraph originalJob;
+       private static ArchivedExecutionJobVertex originalTask;
+       private static ArchivedExecutionVertex originalSubtask;
+       private static ArchivedExecution originalAttempt;
+
+       private static final Object lock = new Object();
+
+       private ArchivedJobGenerationUtils() {
+       }
+
+       public static AccessExecutionGraph getTestJob() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalJob;
+       }
+
+       public static AccessExecutionJobVertex getTestTask() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalTask;
+       }
+
+       public static AccessExecutionVertex getTestSubtask() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalSubtask;
+       }
+
+       public static AccessExecution getTestAttempt() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalAttempt;
+       }
+
+       private static void generateArchivedJob() throws Exception {
+               // Attempt
+               StringifiedAccumulatorResult acc1 = new 
StringifiedAccumulatorResult("name1", "type1", "value1");
+               StringifiedAccumulatorResult acc2 = new 
StringifiedAccumulatorResult("name2", "type2", "value2");
+               TaskManagerLocation location = new TaskManagerLocation(new 
ResourceID("hello"), InetAddress.getLocalHost(), 1234);
+               originalAttempt = new ArchivedExecutionBuilder()
+                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9})
+                       .setParallelSubtaskIndex(1)
+                       .setAttemptNumber(3)
+                       .setAssignedResourceLocation(location)
+                       .setUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
+                       .setState(ExecutionState.FINISHED)
+                       .setFailureCause("attemptException")
+                       .build();
+               // Subtask
+               originalSubtask = new ArchivedExecutionVertexBuilder()
+                       
.setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
+                       .setTaskNameWithSubtask("hello(1/1)")
+                       .setCurrentExecution(originalAttempt)
+                       .build();
+               // Task
+               originalTask = new ArchivedExecutionJobVertexBuilder()
+                       .setTaskVertices(new 
ArchivedExecutionVertex[]{originalSubtask})
+                       .build();
+               // Job
+               Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new 
HashMap<>();
+               tasks.put(originalTask.getJobVertexId(), originalTask);
+               originalJob = new ArchivedExecutionGraphBuilder()
+                       .setJobID(new JobID())
+                       .setTasks(tasks)
+                       .setFailureCause("jobException")
+                       .setState(JobStatus.FINISHED)
+                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9, 10})
+                       .setArchivedUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
+                       .build();
+       }
+
+       // 
========================================================================
+       // utility methods
+       // 
========================================================================
+
+       public static void 
compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, 
ArrayNode writtenAccs) {
+               assertEquals(expectedAccs.length, writtenAccs.size());
+               for (int x = 0; x < expectedAccs.length; x++) {
+                       JsonNode acc = writtenAccs.get(x);
+
+                       assertEquals(expectedAccs[x].getName(), 
acc.get("name").asText());
+                       assertEquals(expectedAccs[x].getType(), 
acc.get("type").asText());
+                       assertEquals(expectedAccs[x].getValue(), 
acc.get("value").asText());
+               }
+       }
+
+       public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode 
writtenMetrics) {
+               assertEquals(expectedMetrics.getNumBytesInTotal(), 
writtenMetrics.get("read-bytes").asLong());
+               assertEquals(expectedMetrics.getNumBytesOut(), 
writtenMetrics.get("write-bytes").asLong());
+               assertEquals(expectedMetrics.getNumRecordsIn(), 
writtenMetrics.get("read-records").asLong());
+               assertEquals(expectedMetrics.getNumRecordsOut(), 
writtenMetrics.get("write-records").asLong());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
index c189d42..4b1c62f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java
@@ -59,6 +59,21 @@ public class ArchivedExecution implements AccessExecution, 
Serializable {
                this.ioMetrics = execution.getIOMetrics();
        }
 
+       public ArchivedExecution(
+                       StringifiedAccumulatorResult[] userAccumulators, 
IOMetrics ioMetrics,
+                       ExecutionAttemptID attemptId, int attemptNumber, 
ExecutionState state, String failureCause,
+                       TaskManagerLocation assignedResourceLocation, int 
parallelSubtaskIndex, long[] stateTimestamps) {
+               this.userAccumulators = userAccumulators;
+               this.ioMetrics = ioMetrics;
+               this.failureCause = failureCause;
+               this.assignedResourceLocation = assignedResourceLocation;
+               this.attemptNumber = attemptNumber;
+               this.attemptId = attemptId;
+               this.state = state;
+               this.stateTimestamps = stateTimestamps;
+               this.parallelSubtaskIndex = parallelSubtaskIndex;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //   Accessors
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
index c744907..6b54760 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java
@@ -54,6 +54,21 @@ public class ArchivedExecutionJobVertex implements 
AccessExecutionJobVertex, Ser
                this.maxParallelism = jobVertex.getMaxParallelism();
        }
 
+       public ArchivedExecutionJobVertex(
+                       ArchivedExecutionVertex[] taskVertices,
+                       JobVertexID id,
+                       String name,
+                       int parallelism,
+                       int maxParallelism,
+                       StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
+               this.taskVertices = taskVertices;
+               this.id = id;
+               this.name = name;
+               this.parallelism = parallelism;
+               this.maxParallelism = maxParallelism;
+               this.archivedUserAccumulators = archivedUserAccumulators;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //   Accessors
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
index 5053cae..36669d3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java
@@ -45,6 +45,15 @@ public class ArchivedExecutionVertex implements 
AccessExecutionVertex, Serializa
                this.currentExecution = 
vertex.getCurrentExecutionAttempt().archive();
        }
 
+       public ArchivedExecutionVertex(
+                       int subTaskIndex, String taskNameWithSubtask,
+                       ArchivedExecution currentExecution, 
EvictingBoundedList<ArchivedExecution> priorExecutions) {
+               this.subTaskIndex = subTaskIndex;
+               this.taskNameWithSubtask = taskNameWithSubtask;
+               this.currentExecution = currentExecution;
+               this.priorExecutions = priorExecutions;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //   Accessors
        // 
--------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
index 15c54b4..e0472ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java
@@ -26,19 +26,19 @@ import java.io.Serializable;
  */
 public class IOMetrics implements Serializable {
        private static final long serialVersionUID = -7208093607556457183L;
-       private final long numRecordsIn;
-       private final long numRecordsOut;
+       protected long numRecordsIn;
+       protected long numRecordsOut;
 
-       private final double numRecordsInPerSecond;
-       private final double numRecordsOutPerSecond;
+       protected double numRecordsInPerSecond;
+       protected double numRecordsOutPerSecond;
 
-       private final long numBytesInLocal;
-       private final long numBytesInRemote;
-       private final long numBytesOut;
+       protected long numBytesInLocal;
+       protected long numBytesInRemote;
+       protected long numBytesOut;
 
-       private final double numBytesInLocalPerSecond;
-       private final double numBytesInRemotePerSecond;
-       private final double numBytesOutPerSecond;
+       protected double numBytesInLocalPerSecond;
+       protected double numBytesInRemotePerSecond;
+       protected double numBytesOutPerSecond;
 
        public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, 
Meter bytesRemoteIn, Meter bytesOut) {
                this.numRecordsIn = recordsIn.getCount();
@@ -53,6 +53,22 @@ public class IOMetrics implements Serializable {
                this.numBytesOutPerSecond = bytesOut.getRate();
        }
 
+       public IOMetrics(
+                       int numBytesInLocal, int numBytesInRemote, int 
numBytesOut, int numRecordsIn, int numRecordsOut,
+                       double numBytesInLocalPerSecond, double 
numBytesInRemotePerSecond, double numBytesOutPerSecond,
+                       double numRecordsInPerSecond, double 
numRecordsOutPerSecond) {
+               this.numBytesInLocal = numBytesInLocal;
+               this.numBytesInRemote = numBytesInRemote;
+               this.numBytesOut = numBytesOut;
+               this.numRecordsIn = numRecordsIn;
+               this.numRecordsOut = numRecordsOut;
+               this.numBytesInLocalPerSecond = numBytesInLocalPerSecond;
+               this.numBytesInRemotePerSecond = numBytesInRemotePerSecond;
+               this.numBytesOutPerSecond = numBytesOutPerSecond;
+               this.numRecordsInPerSecond = numRecordsInPerSecond;
+               this.numRecordsOutPerSecond = numRecordsOutPerSecond;
+       }
+
        public long getNumRecordsIn() {
                return numRecordsIn;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
index 3090172..b3e9d5d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java
@@ -106,9 +106,13 @@ public class ArchivedExecutionGraphTest {
                        new NoRestartStrategy());
                runtimeGraph.attachJobGraph(vertices);
 
+               List<ExecutionJobVertex> jobVertices = new ArrayList<>();
+               jobVertices.add(runtimeGraph.getJobVertex(v1ID));
+               jobVertices.add(runtimeGraph.getJobVertex(v2ID));
+               
                CheckpointStatsTracker statsTracker = new 
CheckpointStatsTracker(
                                0,
-                               Collections.<ExecutionJobVertex>emptyList(),
+                               jobVertices,
                                mock(JobSnapshottingSettings.class),
                                new UnregisteredMetricsGroup());
 

Reply via email to