Repository: flink Updated Branches: refs/heads/master 51b7ede28 -> a552d6746
http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java index e9c9f84..54f3f9c 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -17,6 +17,12 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +34,27 @@ public class SubtaskExecutionAttemptDetailsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionGraph originalJob = ArchivedJobGenerationUtils.getTestJob(); + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + String json = SubtaskExecutionAttemptDetailsHandler.createAttemptDetailsJson( + originalAttempt, originalJob.getJobID().toString(), originalTask.getJobVertexId().toString(), null); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalAttempt.getParallelSubtaskIndex(), result.get("subtask").asInt()); + Assert.assertEquals(originalAttempt.getState().name(), result.get("status").asText()); + Assert.assertEquals(originalAttempt.getAttemptNumber(), result.get("attempt").asInt()); + Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), result.get("host").asText()); + long start = originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING); + Assert.assertEquals(start, result.get("start-time").asLong()); + long end = originalAttempt.getStateTimestamp(ExecutionState.FINISHED); + Assert.assertEquals(end, result.get("end-time").asLong()); + Assert.assertEquals(end - start, result.get("duration").asLong()); + + ArchivedJobGenerationUtils.compareIoMetrics(originalAttempt.getIOMetrics(), result.get("metrics")); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java index 1efb260..954ebad 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksAllAccumulatorsHandlerTest.java @@ -17,6 +17,11 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +33,31 @@ public class SubtasksAllAccumulatorsHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + String json = SubtasksAllAccumulatorsHandler.createSubtasksAccumulatorsJson(originalTask); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getParallelism(), result.get("parallelism").asInt()); + + ArrayNode subtasks = (ArrayNode) result.get("subtasks"); + + Assert.assertEquals(originalTask.getTaskVertices().length, subtasks.size()); + for (int x = 0; x < originalTask.getTaskVertices().length; x++) { + JsonNode subtask = subtasks.get(x); + AccessExecutionVertex expectedSubtask = originalTask.getTaskVertices()[x]; + + Assert.assertEquals(x, subtask.get("subtask").asInt()); + Assert.assertEquals(expectedSubtask.getCurrentExecutionAttempt().getAttemptNumber(), subtask.get("attempt").asInt()); + Assert.assertEquals(expectedSubtask.getCurrentAssignedResourceLocation().getHostname(), subtask.get("host").asText()); + + ArchivedJobGenerationUtils.compareStringifiedAccumulators( + expectedSubtask.getCurrentExecutionAttempt().getUserAccumulatorsStringified(), + (ArrayNode) subtask.get("user-accumulators")); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java index 1c8d2bd..939f439 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/SubtasksTimesHandlerTest.java @@ -17,6 +17,12 @@ */ package org.apache.flink.runtime.webmonitor.handlers; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.webmonitor.utils.ArchivedJobGenerationUtils; import org.junit.Assert; import org.junit.Test; @@ -28,4 +34,35 @@ public class SubtasksTimesHandlerTest { Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); } + + @Test + public void testJsonGeneration() throws Exception { + AccessExecutionJobVertex originalTask = ArchivedJobGenerationUtils.getTestTask(); + AccessExecution originalAttempt = ArchivedJobGenerationUtils.getTestAttempt(); + String json = SubtasksTimesHandler.createSubtaskTimesJson(originalTask); + + JsonNode result = ArchivedJobGenerationUtils.mapper.readTree(json); + + Assert.assertEquals(originalTask.getJobVertexId().toString(), result.get("id").asText()); + Assert.assertEquals(originalTask.getName(), result.get("name").asText()); + Assert.assertTrue(result.get("now").asLong() > 0L); + + ArrayNode subtasks = (ArrayNode) result.get("subtasks"); + + JsonNode subtask = subtasks.get(0); + Assert.assertEquals(0, subtask.get("subtask").asInt()); + Assert.assertEquals(originalAttempt.getAssignedResourceLocation().getHostname(), subtask.get("host").asText()); + Assert.assertEquals(originalAttempt.getStateTimestamp(originalAttempt.getState()) - originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), subtask.get("duration").asLong()); + + JsonNode timestamps = subtask.get("timestamps"); + + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CREATED), timestamps.get(ExecutionState.CREATED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.SCHEDULED), timestamps.get(ExecutionState.SCHEDULED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.DEPLOYING), timestamps.get(ExecutionState.DEPLOYING.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.RUNNING), timestamps.get(ExecutionState.RUNNING.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FINISHED), timestamps.get(ExecutionState.FINISHED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELING), timestamps.get(ExecutionState.CANCELING.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.CANCELED), timestamps.get(ExecutionState.CANCELED.name()).asLong()); + Assert.assertEquals(originalAttempt.getStateTimestamp(ExecutionState.FAILED), timestamps.get(ExecutionState.FAILED.name()).asLong()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java new file mode 100644 index 0000000..98fc92d --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionBuilder.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.MeterView; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.Preconditions; + +import java.net.InetAddress; +import java.net.UnknownHostException; + +public class ArchivedExecutionBuilder { + private ExecutionAttemptID attemptId; + private long[] stateTimestamps; + private int attemptNumber; + private ExecutionState state; + private String failureCause; + private TaskManagerLocation assignedResourceLocation; + private StringifiedAccumulatorResult[] userAccumulators; + private IOMetrics ioMetrics; + private int parallelSubtaskIndex; + + public ArchivedExecutionBuilder setAttemptId(ExecutionAttemptID attemptId) { + this.attemptId = attemptId; + return this; + } + + public ArchivedExecutionBuilder setStateTimestamps(long[] stateTimestamps) { + Preconditions.checkArgument(stateTimestamps.length == ExecutionState.values().length); + this.stateTimestamps = stateTimestamps; + return this; + } + + public ArchivedExecutionBuilder setAttemptNumber(int attemptNumber) { + this.attemptNumber = attemptNumber; + return this; + } + + public ArchivedExecutionBuilder setState(ExecutionState state) { + this.state = state; + return this; + } + + public ArchivedExecutionBuilder setFailureCause(String failureCause) { + this.failureCause = failureCause; + return this; + } + + public ArchivedExecutionBuilder setAssignedResourceLocation(TaskManagerLocation assignedResourceLocation) { + this.assignedResourceLocation = assignedResourceLocation; + return this; + } + + public ArchivedExecutionBuilder setUserAccumulators(StringifiedAccumulatorResult[] userAccumulators) { + this.userAccumulators = userAccumulators; + return this; + } + + public ArchivedExecutionBuilder setParallelSubtaskIndex(int parallelSubtaskIndex) { + this.parallelSubtaskIndex = parallelSubtaskIndex; + return this; + } + + public ArchivedExecutionBuilder setIOMetrics(IOMetrics ioMetrics) { + this.ioMetrics = ioMetrics; + return this; + } + + public ArchivedExecution build() throws UnknownHostException { + return new ArchivedExecution( + userAccumulators != null ? userAccumulators : new StringifiedAccumulatorResult[0], + ioMetrics != null ? ioMetrics : new TestIOMetrics(), + attemptId != null ? attemptId : new ExecutionAttemptID(), + attemptNumber, + state != null ? state : ExecutionState.FINISHED, + failureCause != null ? failureCause : "(null)", + assignedResourceLocation != null ? assignedResourceLocation : new TaskManagerLocation(new ResourceID("tm"), InetAddress.getLocalHost(), 1234), + parallelSubtaskIndex, + stateTimestamps != null ? stateTimestamps : new long[]{1, 2, 3, 4, 5, 5, 5, 5} + ); + } + + private static class TestIOMetrics extends IOMetrics { + private static final long serialVersionUID = -5920076211680012555L; + + public TestIOMetrics() { + super( + new MeterView(new TestCounter(1), 0), + new MeterView(new TestCounter(2), 0), + new MeterView(new TestCounter(3), 0), + new MeterView(new TestCounter(4), 0), + new MeterView(new TestCounter(5), 0)); + } + } + + private static class TestCounter implements Counter { + private final long count; + + private TestCounter(long count) { + this.count = count; + } + + @Override + public void inc() { + } + + @Override + public void inc(long n) { + } + + @Override + public void dec() { + } + + @Override + public void dec(long n) { + } + + @Override + public long getCount() { + return count; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java new file mode 100644 index 0000000..0880133 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionConfigBuilder.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionMode; + +import java.util.Collections; +import java.util.Map; + +public class ArchivedExecutionConfigBuilder { + private String executionMode; + private String restartStrategyDescription; + private int parallelism; + private boolean objectReuseEnabled; + private Map<String, String> globalJobParameters; + + public ArchivedExecutionConfigBuilder setExecutionMode(String executionMode) { + this.executionMode = executionMode; + return this; + } + + public ArchivedExecutionConfigBuilder setRestartStrategyDescription(String restartStrategyDescription) { + this.restartStrategyDescription = restartStrategyDescription; + return this; + } + + public ArchivedExecutionConfigBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public ArchivedExecutionConfigBuilder setObjectReuseEnabled(boolean objectReuseEnabled) { + this.objectReuseEnabled = objectReuseEnabled; + return this; + } + + public ArchivedExecutionConfigBuilder setGlobalJobParameters(Map<String, String> globalJobParameters) { + this.globalJobParameters = globalJobParameters; + return this; + } + + public ArchivedExecutionConfig build() { + return new ArchivedExecutionConfig( + executionMode != null ? executionMode : ExecutionMode.PIPELINED.name(), + restartStrategyDescription != null ? restartStrategyDescription : "default", + parallelism, + objectReuseEnabled, + globalJobParameters != null ? globalJobParameters : Collections.<String, String>emptyMap() + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java new file mode 100644 index 0000000..1514a5a --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionGraphBuilder.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializedValue; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; + +public class ArchivedExecutionGraphBuilder { + + private static final Random RANDOM = new Random(); + + private JobID jobID; + private String jobName; + private Map<JobVertexID, ArchivedExecutionJobVertex> tasks; + private List<ArchivedExecutionJobVertex> verticesInCreationOrder; + private long[] stateTimestamps; + private JobStatus state; + private String failureCause; + private String jsonPlan; + private StringifiedAccumulatorResult[] archivedUserAccumulators; + private ArchivedExecutionConfig archivedExecutionConfig; + private boolean isStoppable; + private Map<String, SerializedValue<Object>> serializedUserAccumulators; + + public ArchivedExecutionGraphBuilder setJobID(JobID jobID) { + this.jobID = jobID; + return this; + } + + public ArchivedExecutionGraphBuilder setJobName(String jobName) { + this.jobName = jobName; + return this; + } + + public ArchivedExecutionGraphBuilder setTasks(Map<JobVertexID, ArchivedExecutionJobVertex> tasks) { + this.tasks = tasks; + return this; + } + + public ArchivedExecutionGraphBuilder setVerticesInCreationOrder(List<ArchivedExecutionJobVertex> verticesInCreationOrder) { + this.verticesInCreationOrder = verticesInCreationOrder; + return this; + } + + public ArchivedExecutionGraphBuilder setStateTimestamps(long[] stateTimestamps) { + Preconditions.checkArgument(stateTimestamps.length == JobStatus.values().length); + this.stateTimestamps = stateTimestamps; + return this; + } + + public ArchivedExecutionGraphBuilder setState(JobStatus state) { + this.state = state; + return this; + } + + public ArchivedExecutionGraphBuilder setFailureCause(String failureCause) { + this.failureCause = failureCause; + return this; + } + + public ArchivedExecutionGraphBuilder setJsonPlan(String jsonPlan) { + this.jsonPlan = jsonPlan; + return this; + } + + public ArchivedExecutionGraphBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) { + this.archivedUserAccumulators = archivedUserAccumulators; + return this; + } + + public ArchivedExecutionGraphBuilder setArchivedExecutionConfig(ArchivedExecutionConfig archivedExecutionConfig) { + this.archivedExecutionConfig = archivedExecutionConfig; + return this; + } + + public ArchivedExecutionGraphBuilder setStoppable(boolean stoppable) { + isStoppable = stoppable; + return this; + } + + public ArchivedExecutionGraphBuilder setSerializedUserAccumulators(Map<String, SerializedValue<Object>> serializedUserAccumulators) { + this.serializedUserAccumulators = serializedUserAccumulators; + return this; + } + + public ArchivedExecutionGraph build() { + Preconditions.checkNotNull(tasks, "Tasks must not be null."); + JobID jobID = this.jobID != null ? this.jobID : new JobID(); + String jobName = this.jobName != null ? this.jobName : "job_" + RANDOM.nextInt(); + return new ArchivedExecutionGraph( + jobID, + jobName, + tasks, + verticesInCreationOrder != null ? verticesInCreationOrder : new ArrayList<>(tasks.values()), + stateTimestamps != null ? stateTimestamps : new long[JobStatus.values().length], + state != null ? state : JobStatus.FINISHED, + failureCause != null ? failureCause : "(null)", + jsonPlan != null ? jsonPlan : "{\"jobid\":\"" + jobID + "\", \"name\":\"" + jobName + "\", \"nodes\":[]}", + archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0], + serializedUserAccumulators != null ? serializedUserAccumulators : Collections.<String, SerializedValue<Object>>emptyMap(), + archivedExecutionConfig != null ? archivedExecutionConfig : new ArchivedExecutionConfigBuilder().build(), + isStoppable, + null, + null + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java new file mode 100644 index 0000000..8a45d35 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionJobVertexBuilder.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.util.Preconditions; + +import java.util.Random; + +public class ArchivedExecutionJobVertexBuilder { + + private static final Random RANDOM = new Random(); + + private ArchivedExecutionVertex[] taskVertices; + private JobVertexID id; + private String name; + private int parallelism; + private int maxParallelism; + private StringifiedAccumulatorResult[] archivedUserAccumulators; + + public ArchivedExecutionJobVertexBuilder setTaskVertices(ArchivedExecutionVertex[] taskVertices) { + this.taskVertices = taskVertices; + return this; + } + + public ArchivedExecutionJobVertexBuilder setId(JobVertexID id) { + this.id = id; + return this; + } + + public ArchivedExecutionJobVertexBuilder setName(String name) { + this.name = name; + return this; + } + + public ArchivedExecutionJobVertexBuilder setParallelism(int parallelism) { + this.parallelism = parallelism; + return this; + } + + public ArchivedExecutionJobVertexBuilder setMaxParallelism(int maxParallelism) { + this.maxParallelism = maxParallelism; + return this; + } + + public ArchivedExecutionJobVertexBuilder setArchivedUserAccumulators(StringifiedAccumulatorResult[] archivedUserAccumulators) { + this.archivedUserAccumulators = archivedUserAccumulators; + return this; + } + + public ArchivedExecutionJobVertex build() { + Preconditions.checkNotNull(taskVertices); + return new ArchivedExecutionJobVertex( + taskVertices, + id != null ? id : new JobVertexID(), + name != null ? name : "task_" + RANDOM.nextInt(), + parallelism, + maxParallelism, + archivedUserAccumulators != null ? archivedUserAccumulators : new StringifiedAccumulatorResult[0] + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java new file mode 100644 index 0000000..3707374 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedExecutionVertexBuilder.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.util.EvictingBoundedList; +import org.apache.flink.util.Preconditions; + +import java.util.List; +import java.util.Random; + +public class ArchivedExecutionVertexBuilder { + + private static final Random RANDOM = new Random(); + + private int subtaskIndex; + private EvictingBoundedList<ArchivedExecution> priorExecutions; + private String taskNameWithSubtask; + private ArchivedExecution currentExecution; + + public ArchivedExecutionVertexBuilder setSubtaskIndex(int subtaskIndex) { + this.subtaskIndex = subtaskIndex; + return this; + } + + public ArchivedExecutionVertexBuilder setPriorExecutions(List<ArchivedExecution> priorExecutions) { + this.priorExecutions = new EvictingBoundedList<>(priorExecutions.size()); + for (ArchivedExecution execution : priorExecutions) { + this.priorExecutions.add(execution); + } + return this; + } + + public ArchivedExecutionVertexBuilder setTaskNameWithSubtask(String taskNameWithSubtask) { + this.taskNameWithSubtask = taskNameWithSubtask; + return this; + } + + public ArchivedExecutionVertexBuilder setCurrentExecution(ArchivedExecution currentExecution) { + this.currentExecution = currentExecution; + return this; + } + + public ArchivedExecutionVertex build() { + Preconditions.checkNotNull(currentExecution); + return new ArchivedExecutionVertex( + subtaskIndex, + taskNameWithSubtask != null ? taskNameWithSubtask : "task_" + RANDOM.nextInt() + "_" + subtaskIndex, + currentExecution, + priorExecutions != null ? priorExecutions : new EvictingBoundedList<ArchivedExecution>(0) + ); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java new file mode 100644 index 0000000..0340d87 --- /dev/null +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/utils/ArchivedJobGenerationUtils.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.webmonitor.utils; + +import com.fasterxml.jackson.core.JsonFactory; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ArrayNode; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecution; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.AccessExecutionVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecution; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionJobVertex; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionVertex; +import org.apache.flink.runtime.executiongraph.IOMetrics; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; + +import java.net.InetAddress; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ArchivedJobGenerationUtils { + public static final ObjectMapper mapper = new ObjectMapper(); + public static final JsonFactory jacksonFactory = new JsonFactory() + .enable(JsonGenerator.Feature.AUTO_CLOSE_TARGET) + .disable(JsonGenerator.Feature.AUTO_CLOSE_JSON_CONTENT); + + private static ArchivedExecutionGraph originalJob; + private static ArchivedExecutionJobVertex originalTask; + private static ArchivedExecutionVertex originalSubtask; + private static ArchivedExecution originalAttempt; + + private static final Object lock = new Object(); + + private ArchivedJobGenerationUtils() { + } + + public static AccessExecutionGraph getTestJob() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalJob; + } + + public static AccessExecutionJobVertex getTestTask() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalTask; + } + + public static AccessExecutionVertex getTestSubtask() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalSubtask; + } + + public static AccessExecution getTestAttempt() throws Exception { + synchronized (lock) { + if (originalJob == null) { + generateArchivedJob(); + } + } + return originalAttempt; + } + + private static void generateArchivedJob() throws Exception { + // Attempt + StringifiedAccumulatorResult acc1 = new StringifiedAccumulatorResult("name1", "type1", "value1"); + StringifiedAccumulatorResult acc2 = new StringifiedAccumulatorResult("name2", "type2", "value2"); + TaskManagerLocation location = new TaskManagerLocation(new ResourceID("hello"), InetAddress.getLocalHost(), 1234); + originalAttempt = new ArchivedExecutionBuilder() + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9}) + .setParallelSubtaskIndex(1) + .setAttemptNumber(3) + .setAssignedResourceLocation(location) + .setUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) + .setState(ExecutionState.FINISHED) + .setFailureCause("attemptException") + .build(); + // Subtask + originalSubtask = new ArchivedExecutionVertexBuilder() + .setSubtaskIndex(originalAttempt.getParallelSubtaskIndex()) + .setTaskNameWithSubtask("hello(1/1)") + .setCurrentExecution(originalAttempt) + .build(); + // Task + originalTask = new ArchivedExecutionJobVertexBuilder() + .setTaskVertices(new ArchivedExecutionVertex[]{originalSubtask}) + .build(); + // Job + Map<JobVertexID, ArchivedExecutionJobVertex> tasks = new HashMap<>(); + tasks.put(originalTask.getJobVertexId(), originalTask); + originalJob = new ArchivedExecutionGraphBuilder() + .setJobID(new JobID()) + .setTasks(tasks) + .setFailureCause("jobException") + .setState(JobStatus.FINISHED) + .setStateTimestamps(new long[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}) + .setArchivedUserAccumulators(new StringifiedAccumulatorResult[]{acc1, acc2}) + .build(); + } + + // ======================================================================== + // utility methods + // ======================================================================== + + public static void compareStringifiedAccumulators(StringifiedAccumulatorResult[] expectedAccs, ArrayNode writtenAccs) { + assertEquals(expectedAccs.length, writtenAccs.size()); + for (int x = 0; x < expectedAccs.length; x++) { + JsonNode acc = writtenAccs.get(x); + + assertEquals(expectedAccs[x].getName(), acc.get("name").asText()); + assertEquals(expectedAccs[x].getType(), acc.get("type").asText()); + assertEquals(expectedAccs[x].getValue(), acc.get("value").asText()); + } + } + + public static void compareIoMetrics(IOMetrics expectedMetrics, JsonNode writtenMetrics) { + assertEquals(expectedMetrics.getNumBytesInTotal(), writtenMetrics.get("read-bytes").asLong()); + assertEquals(expectedMetrics.getNumBytesOut(), writtenMetrics.get("write-bytes").asLong()); + assertEquals(expectedMetrics.getNumRecordsIn(), writtenMetrics.get("read-records").asLong()); + assertEquals(expectedMetrics.getNumRecordsOut(), writtenMetrics.get("write-records").asLong()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java index c189d42..4b1c62f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java @@ -59,6 +59,21 @@ public class ArchivedExecution implements AccessExecution, Serializable { this.ioMetrics = execution.getIOMetrics(); } + public ArchivedExecution( + StringifiedAccumulatorResult[] userAccumulators, IOMetrics ioMetrics, + ExecutionAttemptID attemptId, int attemptNumber, ExecutionState state, String failureCause, + TaskManagerLocation assignedResourceLocation, int parallelSubtaskIndex, long[] stateTimestamps) { + this.userAccumulators = userAccumulators; + this.ioMetrics = ioMetrics; + this.failureCause = failureCause; + this.assignedResourceLocation = assignedResourceLocation; + this.attemptNumber = attemptNumber; + this.attemptId = attemptId; + this.state = state; + this.stateTimestamps = stateTimestamps; + this.parallelSubtaskIndex = parallelSubtaskIndex; + } + // -------------------------------------------------------------------------------------------- // Accessors // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java index c744907..6b54760 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionJobVertex.java @@ -54,6 +54,21 @@ public class ArchivedExecutionJobVertex implements AccessExecutionJobVertex, Ser this.maxParallelism = jobVertex.getMaxParallelism(); } + public ArchivedExecutionJobVertex( + ArchivedExecutionVertex[] taskVertices, + JobVertexID id, + String name, + int parallelism, + int maxParallelism, + StringifiedAccumulatorResult[] archivedUserAccumulators) { + this.taskVertices = taskVertices; + this.id = id; + this.name = name; + this.parallelism = parallelism; + this.maxParallelism = maxParallelism; + this.archivedUserAccumulators = archivedUserAccumulators; + } + // -------------------------------------------------------------------------------------------- // Accessors // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java index 5053cae..36669d3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionVertex.java @@ -45,6 +45,15 @@ public class ArchivedExecutionVertex implements AccessExecutionVertex, Serializa this.currentExecution = vertex.getCurrentExecutionAttempt().archive(); } + public ArchivedExecutionVertex( + int subTaskIndex, String taskNameWithSubtask, + ArchivedExecution currentExecution, EvictingBoundedList<ArchivedExecution> priorExecutions) { + this.subTaskIndex = subTaskIndex; + this.taskNameWithSubtask = taskNameWithSubtask; + this.currentExecution = currentExecution; + this.priorExecutions = priorExecutions; + } + // -------------------------------------------------------------------------------------------- // Accessors // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java index 15c54b4..e0472ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IOMetrics.java @@ -26,19 +26,19 @@ import java.io.Serializable; */ public class IOMetrics implements Serializable { private static final long serialVersionUID = -7208093607556457183L; - private final long numRecordsIn; - private final long numRecordsOut; + protected long numRecordsIn; + protected long numRecordsOut; - private final double numRecordsInPerSecond; - private final double numRecordsOutPerSecond; + protected double numRecordsInPerSecond; + protected double numRecordsOutPerSecond; - private final long numBytesInLocal; - private final long numBytesInRemote; - private final long numBytesOut; + protected long numBytesInLocal; + protected long numBytesInRemote; + protected long numBytesOut; - private final double numBytesInLocalPerSecond; - private final double numBytesInRemotePerSecond; - private final double numBytesOutPerSecond; + protected double numBytesInLocalPerSecond; + protected double numBytesInRemotePerSecond; + protected double numBytesOutPerSecond; public IOMetrics(Meter recordsIn, Meter recordsOut, Meter bytesLocalIn, Meter bytesRemoteIn, Meter bytesOut) { this.numRecordsIn = recordsIn.getCount(); @@ -53,6 +53,22 @@ public class IOMetrics implements Serializable { this.numBytesOutPerSecond = bytesOut.getRate(); } + public IOMetrics( + int numBytesInLocal, int numBytesInRemote, int numBytesOut, int numRecordsIn, int numRecordsOut, + double numBytesInLocalPerSecond, double numBytesInRemotePerSecond, double numBytesOutPerSecond, + double numRecordsInPerSecond, double numRecordsOutPerSecond) { + this.numBytesInLocal = numBytesInLocal; + this.numBytesInRemote = numBytesInRemote; + this.numBytesOut = numBytesOut; + this.numRecordsIn = numRecordsIn; + this.numRecordsOut = numRecordsOut; + this.numBytesInLocalPerSecond = numBytesInLocalPerSecond; + this.numBytesInRemotePerSecond = numBytesInRemotePerSecond; + this.numBytesOutPerSecond = numBytesOutPerSecond; + this.numRecordsInPerSecond = numRecordsInPerSecond; + this.numRecordsOutPerSecond = numRecordsOutPerSecond; + } + public long getNumRecordsIn() { return numRecordsIn; } http://git-wip-us.apache.org/repos/asf/flink/blob/a552d674/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 3090172..b3e9d5d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -106,9 +106,13 @@ public class ArchivedExecutionGraphTest { new NoRestartStrategy()); runtimeGraph.attachJobGraph(vertices); + List<ExecutionJobVertex> jobVertices = new ArrayList<>(); + jobVertices.add(runtimeGraph.getJobVertex(v1ID)); + jobVertices.add(runtimeGraph.getJobVertex(v2ID)); + CheckpointStatsTracker statsTracker = new CheckpointStatsTracker( 0, - Collections.<ExecutionJobVertex>emptyList(), + jobVertices, mock(JobSnapshottingSettings.class), new UnregisteredMetricsGroup());
