Repository: flink Updated Branches: refs/heads/master 327701032 -> 4fc019a96
http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java new file mode 100644 index 0000000..cb7fe90 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionConfigBuilder.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.utils; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionMode; + +import java.util.Collections; +import java.util.Map; + +/** + * Utility class for constructing an ArchivedExecutionConfig. + */ +public class ArchivedExecutionConfigBuilder { + private String executionMode; + private String restartStrategyDescription; + private int parallelism; + private boolean objectReuseEnabled; + private Map<String, String> globalJobParameters; + + public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) { + this.executionMode = executionMode; + return this; + } + + public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) { + this.restartStrategyDescription = restartStrategyDescription; + return this; + } + + public ArchivedExecutionConfigBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) { + this.objectReuseEnabled = objectReuseEnabled; + return this; + } + + public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) { + this.globalJobParameters = globalJobParameters; + return this; + } + + public ArchivedExecutionConfig build() { + return new ArchivedExecutionConfig( + executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(), + restartStrategyDescription != null ? restartStrategyDescription : "default", + parallelism, + objectReuseEnabled, + globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap() + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java new file mode 100644 index 0000000..68077ba --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionGraphBuilder.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.utils; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +/** + * Utility class for constructing an ArchivedExecutionGraph. + */ +public class ArchivedExecutionGraphBuilder { + + private static final Random RANDOM = new Random(); + + private JobID jobID; + private String jobName; + private Map<JobVertexID, ArchivedExecutionJobVertex> tasks; + private List<ArchivedExecutionJobVertex> verticesInCreationOrder; + private long[] stateTimestamps; + private JobStatus state; + private ErrorInfo failureCause; + private String jsonPlan; + private StringifiedAccumulatorResult[] archivedUserAccumulators; + private ArchivedExecutionConfig archivedExecutionConfig; + private boolean isStoppable; + private Map<String, SerializedValue<Object>> serializedUserAccumulators; + + public ArchivedExecutionGraphBuilder setJobID(JobID jobID) { + this.jobID = jobID; + return this; + } + + public ArchivedExecutionGraphBuilder setJobName(String jobName) { + this.jobName = jobName; + return this; + } + + public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex> tasks) { + this.tasks = tasks; + return this; + } + + public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> verticesInCreationOrder) { + this.verticesInCreationOrder = verticesInCreationOrder; + return this; + } + + public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) { + Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length); + this.stateTimestamps = stateTimestamps; + return this; + } + + public ArchivedExecutionGraphBuilder setState(JobStatus state) { + this.state = state; + return this; + } + + public ArchivedExecutionGraphBuilder setFailureCause(ErrorInfo failureCause) { + this.failureCause = failureCause; + return this; + } + + public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) { + this.jsonPlan = jsonPlan; + return this; + } + + public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) { + this.archivedUserAccumulators = archivedUserAccumulators; + return this; + } + + public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) { + this.archivedExecutionConfig = archivedExecutionConfig; + return this; + } + + public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) { + isStoppable = stoppable; + return this; + } + + public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>> serializedUserAccumulators) { + this.serializedUserAccumulators = serializedUserAccumulators; + return this; + } + + public ArchivedExecutionGraph build() { + Preconditions.checkNotNull(tasks, "Tasks must not be null."); + JobID jobID = this.jobID != null ? this.jobID : new JobID(); + String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt(); + return new ArchivedExecutionGraph( + jobID, + jobName, + tasks, + verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()), + stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length], + state != null ? state : JobStatus.FINISHED, + failureCause, + jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}", + archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0], + serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(), + archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(), + isStoppable, + null, + null + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java new file mode 100644 index 0000000..814c4db --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionJobVertexBuilder.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.utils; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import java.util.Random; + +/** + * Utility class for constructing an ArchivedExecutionJobVertex. + */ +public class ArchivedExecutionJobVertexBuilder { + + private static final Random RANDOM = new Random(); + + private ArchivedExecutionVertex[] taskVertices; + private JobVertexID id; + private String name; + private int parallelism; + private int maxParallelism; + private StringifiedAccumulatorResult[] archivedUserAccumulators; + + public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices) { + this.taskVertices = taskVertices; + return this; + } + + public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) { + this.id = id; + return this; + } + + public ArchivedExecutionJobVertexBuilder setName(String name) { + this.name = name; + return this; + } + + public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) { + this.maxParallelism = maxParallelism; + return this; + } + + public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) { + this.archivedUserAccumulators = archivedUserAccumulators; + return this; + } + + public ArchivedExecutionJobVertex build() { + Preconditions.checkNotNull(taskVertices); + return new ArchivedExecutionJobVertex( + taskVertices, + id != null ? id : new JobVertexID(), + name != null ? name : "task_" + RANDOM.nextInt(), + parallelism, + maxParallelism, + archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0] + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java new file mode 100644 index 0000000..935c792 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedExecutionVertexBuilder.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.utils; + +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.util.EvictingBoundedList; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Random; + +/** + * Utility class for constructing an ArchivedExecutionVertex. + */ +public class ArchivedExecutionVertexBuilder { + + private static final Random RANDOM = new Random(); + + private int subtaskIndex; + private EvictingBoundedList<ArchivedExecution> priorExecutions; + private String taskNameWithSubtask; + private ArchivedExecution currentExecution; + + public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) { + this.subtaskIndex = subtaskIndex; + return this; + } + + public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions) { + this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size()); + for (ArchivedExecution execution : priorExecutions) { + this.priorExecutions.add(execution); + } + return this; + } + + public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask) { + this.taskNameWithSubtask = taskNameWithSubtask; + return this; + } + + public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution) { + this.currentExecution = currentExecution; + return this; + } + + public ArchivedExecutionVertex build() { + Preconditions.checkNotNull(currentExecution); + return new ArchivedExecutionVertex( + subtaskIndex, + taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" + subtaskIndex, + currentExecution, + priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0) + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/4fc019a9/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java new file mode 100644 index 0000000..d256e92 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/utils/ArchivedJobGenerationUtils.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy.utils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +/** + * Common entry-point for accessing generated ArchivedExecution* components. + */ +public class ArchivedJobGenerationUtils { + public static final ObjectMapper MAPPER = new ObjectMapper(); + public static final JsonFactory JACKSON_FACTORY = new JsonFactory() + .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); + + private static ArchivedExecutionGraph originalJob; + private static ArchivedExecutionJobVertex originalTask; + private static ArchivedExecutionVertex originalSubtask; + private static ArchivedExecution originalAttempt; + + private static final Object lock = new Object(); + + private ArchivedJobGenerationUtils() { + } + + public static AccessExecutionGraph getTestJob() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalJob; + } + + public static AccessExecutionJobVertex getTestTask() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalTask; + } + + public static AccessExecutionVertex getTestSubtask() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalSubtask; + } + + public static AccessExecution getTestAttempt() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalAttempt; + } + + private static void generateArchivedJob() throws Exception { + // Attempt + StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1"); + StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2"); + TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234); + originalAttempt = new ArchivedExecutionBuilder() + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9}) + .setParallelSubtaskIndex(1) + .setAttemptNumber(0) + .setAssignedResourceLocation(location) + .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) + .setState(ExecutionState.FINISHED) + .setFailureCause("attemptException") + .build(); + // Subtask + originalSubtask = new ArchivedExecutionVertexBuilder() + .setSubtaskIndex(originalAttempt.getParallelSubtaskIndex()) + .setTaskNameWithSubtask("hello(1/1)") + .setCurrentExecution(originalAttempt) + .build(); + // Task + originalTask = new ArchivedExecutionJobVertexBuilder() + .setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask}) + .build(); + // Job + Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>(); + tasks.put(originalTask.getJobVertexId(), originalTask); + originalJob = new ArchivedExecutionGraphBuilder() + .setJobID(new JobID()) + .setTasks(tasks) + .setFailureCause(new ErrorInfo(new Exception("jobException"), originalAttempt.getStateTimestamp(ExecutionState.FAILED))) + .setState(JobStatus.FINISHED) + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) + .build(); + } + + // ======================================================================== + // utility methods + // ======================================================================== + + public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, ArrayNode writtenAccs) { + assertEquals(expectedAccs.length, writtenAccs.size()); + for (int x = 0; x < expectedAccs.length; x++) { + JsonNode acc = writtenAccs.get(x); + + assertEquals(expectedAccs[x].getName(), acc.get("name").asText()); + assertEquals(expectedAccs[x].getType(), acc.get("type").asText()); + assertEquals(expectedAccs[x].getValue(), acc.get("value").asText()); + } + } + + public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics) { + assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong()); + assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong()); + assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong()); + assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong()); + } +}
