Repository: flink
Updated Branches:
  refs/heads/master 327701032 -> 4fc019a96


http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
new file mode 100644
index 0000000..cb7fe90
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.ExecutionMode;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * Utility class for constructing an ArchivedExecutionConfig.
+ */
+public class ArchivedExecutionConfigBuilder {
+       private String executionMode;
+       private String restartStrategyDescription;
+       private int parallelism;
+       private boolean objectReuseEnabled;
+       private Map<String, String> globalJobParameters;
+
+       public ArchivedExecutionConfigBuilder setExecutionMode(String 
executionMode) {
+               this.executionMode = executionMode;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder 
setRestartStrategyDescription(String restartStrategyDescription) {
+               this.restartStrategyDescription = restartStrategyDescription;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder setParallelism(int parallelism) {
+               this.parallelism = parallelism;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean 
objectReuseEnabled) {
+               this.objectReuseEnabled = objectReuseEnabled;
+               return this;
+       }
+
+       public ArchivedExecutionConfigBuilder 
setGlobalJobParameters(Map<String, String> globalJobParameters) {
+               this.globalJobParameters = globalJobParameters;
+               return this;
+       }
+
+       public ArchivedExecutionConfig build() {
+               return new ArchivedExecutionConfig(
+                       executionMode != null ? executionMode : 
ExecutionMode.PIPELINED.name(),
+                       restartStrategyDescription != null ? 
restartStrategyDescription : "default",
+                       parallelism,
+                       objectReuseEnabled,
+                       globalJobParameters != null ? globalJobParameters : 
Collections.<String, String>emptyMap()
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
new file mode 100644
index 0000000..68077ba
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.api.common.ArchivedExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * Utility class for constructing an ArchivedExecutionGraph.
+ */
+public class ArchivedExecutionGraphBuilder {
+
+       private static final Random RANDOM = new Random();
+
+       private JobID jobID;
+       private String jobName;
+       private Map<JobVertexID, ArchivedExecutionJobVertex> tasks;
+       private List<ArchivedExecutionJobVertex> verticesInCreationOrder;
+       private long[] stateTimestamps;
+       private JobStatus state;
+       private ErrorInfo failureCause;
+       private String jsonPlan;
+       private StringifiedAccumulatorResult[] archivedUserAccumulators;
+       private ArchivedExecutionConfig archivedExecutionConfig;
+       private boolean isStoppable;
+       private Map<String, SerializedValue<Object>> serializedUserAccumulators;
+
+       public ArchivedExecutionGraphBuilder setJobID(JobID jobID) {
+               this.jobID = jobID;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setJobName(String jobName) {
+               this.jobName = jobName;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, 
ArchivedExecutionJobVertex> tasks) {
+               this.tasks = tasks;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> 
verticesInCreationOrder) {
+               this.verticesInCreationOrder = verticesInCreationOrder;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setStateTimestamps(long[] 
stateTimestamps) {
+               Preconditions.checkArgument(stateTimestamps.length == 
JobStatus.values().length);
+               this.stateTimestamps = stateTimestamps;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setState(JobStatus state) {
+               this.state = state;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo 
failureCause) {
+               this.failureCause = failureCause;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) {
+               this.jsonPlan = jsonPlan;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setArchivedUserAccumulators(StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
+               this.archivedUserAccumulators = archivedUserAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) {
+               this.archivedExecutionConfig = archivedExecutionConfig;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) {
+               isStoppable = stoppable;
+               return this;
+       }
+
+       public ArchivedExecutionGraphBuilder 
setSerializedUserAccumulators(Map<String, SerializedValue<Object>> 
serializedUserAccumulators) {
+               this.serializedUserAccumulators = serializedUserAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionGraph build() {
+               Preconditions.checkNotNull(tasks, "Tasks must not be null.");
+               JobID jobID = this.jobID != null ? this.jobID : new JobID();
+               String jobName = this.jobName != null ? this.jobName : "job_" + 
RANDOM.nextInt();
+               return new ArchivedExecutionGraph(
+                       jobID,
+                       jobName,
+                       tasks,
+                       verticesInCreationOrder != null ? 
verticesInCreationOrder : new ArrayList<>(tasks.values()),
+                       stateTimestamps != null ? stateTimestamps : new 
long[JobStatus.values().length],
+                       state != null ? state : JobStatus.FINISHED,
+                       failureCause,
+                       jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + 
"\", \"name\":\"" + jobName + "\", \"nodes\":[]}",
+                       archivedUserAccumulators != null ? 
archivedUserAccumulators : new StringifiedAccumulatorResult[0],
+                       serializedUserAccumulators != null ? 
serializedUserAccumulators : Collections.<String, 
SerializedValue<Object>>emptyMap(),
+                       archivedExecutionConfig != null ? 
archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(),
+                       isStoppable,
+                       null,
+                       null
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
new file mode 100644
index 0000000..814c4db
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Random;
+
+/**
+ * Utility class for constructing an ArchivedExecutionJobVertex.
+ */
+public class ArchivedExecutionJobVertexBuilder {
+
+       private static final Random RANDOM = new Random();
+
+       private ArchivedExecutionVertex[] taskVertices;
+       private JobVertexID id;
+       private String name;
+       private int parallelism;
+       private int maxParallelism;
+       private StringifiedAccumulatorResult[] archivedUserAccumulators;
+
+       public ArchivedExecutionJobVertexBuilder 
setTaskVertices(ArchivedExecutionVertex[] taskVertices) {
+               this.taskVertices = taskVertices;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) {
+               this.id = id;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setName(String name) {
+               this.name = name;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setParallelism(int 
parallelism) {
+               this.parallelism = parallelism;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder setMaxParallelism(int 
maxParallelism) {
+               this.maxParallelism = maxParallelism;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertexBuilder 
setArchivedUserAccumulators(StringifiedAccumulatorResult[] 
archivedUserAccumulators) {
+               this.archivedUserAccumulators = archivedUserAccumulators;
+               return this;
+       }
+
+       public ArchivedExecutionJobVertex build() {
+               Preconditions.checkNotNull(taskVertices);
+               return new ArchivedExecutionJobVertex(
+                       taskVertices,
+                       id != null ? id : new JobVertexID(),
+                       name != null ? name : "task_" + RANDOM.nextInt(),
+                       parallelism,
+                       maxParallelism,
+                       archivedUserAccumulators != null ? 
archivedUserAccumulators : new StringifiedAccumulatorResult[0]
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
new file mode 100644
index 0000000..935c792
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.util.EvictingBoundedList;
+import org.apache.flink.util.Preconditions;
+
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Utility class for constructing an ArchivedExecutionVertex.
+ */
+public class ArchivedExecutionVertexBuilder {
+
+       private static final Random RANDOM = new Random();
+
+       private int subtaskIndex;
+       private EvictingBoundedList<ArchivedExecution> priorExecutions;
+       private String taskNameWithSubtask;
+       private ArchivedExecution currentExecution;
+
+       public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) 
{
+               this.subtaskIndex = subtaskIndex;
+               return this;
+       }
+
+       public ArchivedExecutionVertexBuilder 
setPriorExecutions(List<ArchivedExecution> priorExecutions) {
+               this.priorExecutions = new 
EvictingBoundedList<>(priorExecutions.size());
+               for (ArchivedExecution execution : priorExecutions) {
+                       this.priorExecutions.add(execution);
+               }
+               return this;
+       }
+
+       public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String 
taskNameWithSubtask) {
+               this.taskNameWithSubtask = taskNameWithSubtask;
+               return this;
+       }
+
+       public ArchivedExecutionVertexBuilder 
setCurrentExecution(ArchivedExecution currentExecution) {
+               this.currentExecution = currentExecution;
+               return this;
+       }
+
+       public ArchivedExecutionVertex build() {
+               Preconditions.checkNotNull(currentExecution);
+               return new ArchivedExecutionVertex(
+                       subtaskIndex,
+                       taskNameWithSubtask != null ? taskNameWithSubtask : 
"task_" + RANDOM.nextInt() + "_" + subtaskIndex,
+                       currentExecution,
+                       priorExecutions != null ? priorExecutions : new 
EvictingBoundedList<ArchivedExecution>(0)
+               );
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
new file mode 100644
index 0000000..d256e92
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecution;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecution;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ErrorInfo;
+import org.apache.flink.runtime.executiongraph.IOMetrics;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+
+import java.net.InetAddress;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Common entry-point for accessing generated ArchivedExecution* components.
+ */
+public class ArchivedJobGenerationUtils {
+       public static final ObjectMapper MAPPER = new ObjectMapper();
+       public static final JsonFactory JACKSON_FACTORY = new JsonFactory()
+               .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET)
+               .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT);
+
+       private static ArchivedExecutionGraph originalJob;
+       private static ArchivedExecutionJobVertex originalTask;
+       private static ArchivedExecutionVertex originalSubtask;
+       private static ArchivedExecution originalAttempt;
+
+       private static final Object lock = new Object();
+
+       private ArchivedJobGenerationUtils() {
+       }
+
+       public static AccessExecutionGraph getTestJob() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalJob;
+       }
+
+       public static AccessExecutionJobVertex getTestTask() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalTask;
+       }
+
+       public static AccessExecutionVertex getTestSubtask() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalSubtask;
+       }
+
+       public static AccessExecution getTestAttempt() throws Exception {
+               synchronized (lock) {
+                       if (originalJob == null) {
+                               generateArchivedJob();
+                       }
+               }
+               return originalAttempt;
+       }
+
+       private static void generateArchivedJob() throws Exception {
+               // Attempt
+               StringifiedAccumulatorResult acc1 = new 
StringifiedAccumulatorResult("name1", "type1", "value1");
+               StringifiedAccumulatorResult acc2 = new 
StringifiedAccumulatorResult("name2", "type2", "value2");
+               TaskManagerLocation location = new TaskManagerLocation(new 
ResourceID("hello"), InetAddress.getLocalHost(), 1234);
+               originalAttempt = new ArchivedExecutionBuilder()
+                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9})
+                       .setParallelSubtaskIndex(1)
+                       .setAttemptNumber(0)
+                       .setAssignedResourceLocation(location)
+                       .setUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
+                       .setState(ExecutionState.FINISHED)
+                       .setFailureCause("attemptException")
+                       .build();
+               // Subtask
+               originalSubtask = new ArchivedExecutionVertexBuilder()
+                       
.setSubtaskIndex(originalAttempt.getParallelSubtaskIndex())
+                       .setTaskNameWithSubtask("hello(1/1)")
+                       .setCurrentExecution(originalAttempt)
+                       .build();
+               // Task
+               originalTask = new ArchivedExecutionJobVertexBuilder()
+                       .setTaskVertices(new 
ArchivedExecutionVertex[]{originalSubtask})
+                       .build();
+               // Job
+               Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new 
HashMap<>();
+               tasks.put(originalTask.getJobVertexId(), originalTask);
+               originalJob = new ArchivedExecutionGraphBuilder()
+                       .setJobID(new JobID())
+                       .setTasks(tasks)
+                       .setFailureCause(new ErrorInfo(new 
Exception("jobException"), 
originalAttempt.getStateTimestamp(ExecutionState.FAILED)))
+                       .setState(JobStatus.FINISHED)
+                       .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 
9, 10})
+                       .setArchivedUserAccumulators(new 
StringifiedAccumulatorResult[]{acc1, acc2})
+                       .build();
+       }
+
+       // 
========================================================================
+       // utility methods
+       // 
========================================================================
+
+       public static void 
compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, 
ArrayNode writtenAccs) {
+               assertEquals(expectedAccs.length, writtenAccs.size());
+               for (int x = 0; x < expectedAccs.length; x++) {
+                       JsonNode acc = writtenAccs.get(x);
+
+                       assertEquals(expectedAccs[x].getName(), 
acc.get("name").asText());
+                       assertEquals(expectedAccs[x].getType(), 
acc.get("type").asText());
+                       assertEquals(expectedAccs[x].getValue(), 
acc.get("value").asText());
+               }
+       }
+
+       public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode 
writtenMetrics) {
+               assertEquals(expectedMetrics.getNumBytesInTotal(), 
writtenMetrics.get("read-bytes").asLong());
+               assertEquals(expectedMetrics.getNumBytesOut(), 
writtenMetrics.get("write-bytes").asLong());
+               assertEquals(expectedMetrics.getNumRecordsIn(), 
writtenMetrics.get("read-records").asLong());
+               assertEquals(expectedMetrics.getNumRecordsOut(), 
writtenMetrics.get("write-records").asLong());
+       }
+}

Reply via email to