This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit bb8e1d14f05aca186ec874437eba3d44fbb3bd97 Author: Yangze Guo <[email protected]> AuthorDate: Tue May 24 11:25:17 2022 +0800 [FLINK-28309][rest] Introduce metrics of the duration that a task stays in each status This closes #20111. --- .../shortcodes/generated/rest_v1_dispatcher.html | 18 +++++ docs/static/generated/rest_v1_dispatcher.yml | 5 ++ .../src/test/resources/rest_api_v1.snapshot | 18 +++++ .../runtime/executiongraph/AccessExecution.java | 15 +++++ .../runtime/executiongraph/ArchivedExecution.java | 19 +++++- .../flink/runtime/executiongraph/Execution.java | 29 +++++++- .../job/SubtaskExecutionAttemptDetailsInfo.java | 77 ++++++++++++++++++++-- .../executiongraph/ArchivedExecutionGraphTest.java | 30 +++++++++ .../executiongraph/ExecutionHistoryTest.java | 1 + .../rest/handler/job/JobExceptionsHandlerTest.java | 4 +- .../SubtaskCurrentAttemptDetailsHandlerTest.java | 16 ++++- ...askExecutionAttemptAccumulatorsHandlerTest.java | 1 + .../SubtaskExecutionAttemptDetailsHandlerTest.java | 12 +++- .../rest/messages/JobVertexDetailsInfoTest.java | 10 ++- .../SubtaskExecutionAttemptDetailsInfoTest.java | 12 +++- .../exceptionhistory/TestingAccessExecution.java | 10 +++ 16 files changed, 260 insertions(+), 17 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html index 4b648e77e72..3f56fcef74b 100644 --- a/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html +++ b/docs/layouts/shortcodes/generated/rest_v1_dispatcher.html @@ -3712,6 +3712,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "string", "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ] }, + "status-duration" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } + }, "subtask" : { "type" : "integer" }, @@ -4406,6 +4412,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "string", "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ] }, + "status-duration" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } + }, "subtask" : { "type" : "integer" }, @@ -4543,6 +4555,12 @@ Using 'curl' you can upload a jar via 'curl -X POST -H "Expect:" -F "jarfile=@pa "type" : "string", "enum" : [ "CREATED", "SCHEDULED", "DEPLOYING", "RUNNING", "FINISHED", "CANCELING", "CANCELED", "FAILED", "RECONCILING", "INITIALIZING" ] }, + "status-duration" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } + }, "subtask" : { "type" : "integer" }, diff --git a/docs/static/generated/rest_v1_dispatcher.yml b/docs/static/generated/rest_v1_dispatcher.yml index 259d5b6d9ba..4b85119cd68 100644 --- a/docs/static/generated/rest_v1_dispatcher.yml +++ b/docs/static/generated/rest_v1_dispatcher.yml @@ -1687,6 +1687,11 @@ components: $ref: '#/components/schemas/IOMetricsInfo' taskmanager-id: type: string + status-duration: + type: object + additionalProperties: + type: integer + format: int64 JobResult: type: object properties: diff --git a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot index 99cd6fb0cab..30cbe20f7d1 100644 --- a/flink-runtime-web/src/test/resources/rest_api_v1.snapshot +++ b/flink-runtime-web/src/test/resources/rest_api_v1.snapshot @@ -2191,6 +2191,12 @@ "taskmanager-id" : { "type" : "string" }, + "status-duration" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } + }, "start_time" : { "type" : "integer" } @@ -2561,6 +2567,12 @@ "taskmanager-id" : { "type" : "string" }, + "status-duration" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } + }, "start_time" : { "type" : "integer" } @@ -2656,6 +2668,12 @@ "taskmanager-id" : { "type" : "string" }, + "status-duration" : { + "type" : "object", + "additionalProperties" : { + "type" : "integer" + } + }, "start_time" : { "type" : "integer" } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java index b0da913b4dd..c8a5dcf7f00 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecution.java @@ -47,6 +47,13 @@ public interface AccessExecution { */ long[] getStateTimestamps(); + /** + * Returns the end timestamps for every {@link ExecutionState}. + * + * @return timestamps for each state + */ + long[] getStateEndTimestamps(); + /** * Returns the current {@link ExecutionState} for this execution. * @@ -79,6 +86,14 @@ public interface AccessExecution { */ long getStateTimestamp(ExecutionState state); + /** + * Returns the end timestamp for the given {@link ExecutionState}. + * + * @param state state for which the timestamp should be returned + * @return timestamp for the given state + */ + long getStateEndTimestamp(ExecutionState state); + /** * Returns the user-defined accumulators as strings. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java index a270d072c4d..7cb2d8a39da 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ArchivedExecution.java @@ -37,6 +37,8 @@ public class ArchivedExecution implements AccessExecution, Serializable { private final long[] stateTimestamps; + private final long[] stateEndTimestamps; + private final ExecutionState state; @Nullable private final ErrorInfo failureInfo; // once assigned, never changes @@ -59,7 +61,8 @@ public class ArchivedExecution implements AccessExecution, Serializable { execution.getFailureInfo().orElse(null), execution.getAssignedResourceLocation(), execution.getAssignedAllocationID(), - execution.getStateTimestamps()); + execution.getStateTimestamps(), + execution.getStateEndTimestamps()); } public ArchivedExecution( @@ -70,7 +73,8 @@ public class ArchivedExecution implements AccessExecution, Serializable { @Nullable ErrorInfo failureCause, TaskManagerLocation assignedResourceLocation, AllocationID assignedAllocationID, - long[] stateTimestamps) { + long[] stateTimestamps, + long[] stateEndTimestamps) { this.userAccumulators = userAccumulators; this.ioMetrics = ioMetrics; this.failureInfo = failureCause; @@ -78,6 +82,7 @@ public class ArchivedExecution implements AccessExecution, Serializable { this.attemptId = attemptId; this.state = state; this.stateTimestamps = stateTimestamps; + this.stateEndTimestamps = stateEndTimestamps; this.assignedAllocationID = assignedAllocationID; } @@ -100,6 +105,11 @@ public class ArchivedExecution implements AccessExecution, Serializable { return stateTimestamps; } + @Override + public long[] getStateEndTimestamps() { + return stateEndTimestamps; + } + @Override public ExecutionState getState() { return state; @@ -124,6 +134,11 @@ public class ArchivedExecution implements AccessExecution, Serializable { return this.stateTimestamps[state.ordinal()]; } + @Override + public long getStateEndTimestamp(ExecutionState state) { + return this.stateEndTimestamps[state.ordinal()]; + } + @Override public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { return userAccumulators; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 79c2cb9602f..459d9861980 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -133,6 +133,12 @@ public class Execution */ private final long[] stateTimestamps; + /** + * The end timestamps when state transitions occurred, indexed by {@link + * ExecutionState#ordinal()}. + */ + private final long[] stateEndTimestamps; + private final Time rpcTimeout; private final Collection<PartitionInfo> partitionInfos; @@ -211,6 +217,7 @@ public class Execution this.rpcTimeout = checkNotNull(rpcTimeout); this.stateTimestamps = new long[ExecutionState.values().length]; + this.stateEndTimestamps = new long[ExecutionState.values().length]; markTimestamp(CREATED, startTimestamp); this.partitionInfos = new ArrayList<>(16); @@ -337,11 +344,21 @@ public class Execution return stateTimestamps; } + @Override + public long[] getStateEndTimestamps() { + return stateEndTimestamps; + } + @Override public long getStateTimestamp(ExecutionState state) { return this.stateTimestamps[state.ordinal()]; } + @Override + public long getStateEndTimestamp(ExecutionState state) { + return this.stateEndTimestamps[state.ordinal()]; + } + public boolean isFinished() { return state.isTerminal(); } @@ -1405,7 +1422,7 @@ public class Execution if (state == currentState) { state = targetState; - markTimestamp(targetState); + markTimestamp(currentState, targetState); if (error == null) { LOG.info( @@ -1454,14 +1471,20 @@ public class Execution } } - private void markTimestamp(ExecutionState state) { - markTimestamp(state, System.currentTimeMillis()); + private void markTimestamp(ExecutionState currentState, ExecutionState targetState) { + long now = System.currentTimeMillis(); + markTimestamp(targetState, now); + markEndTimestamp(currentState, now); } private void markTimestamp(ExecutionState state, long timestamp) { this.stateTimestamps[state.ordinal()] = timestamp; } + private void markEndTimestamp(ExecutionState state, long timestamp) { + this.stateEndTimestamps[state.ordinal()] = timestamp; + } + public String getVertexWithAttempt() { return vertex.getTaskNameWithSubtaskIndex() + " - execution #" + getAttemptNumber(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java index 637538f1f48..a1a3115369c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfo.java @@ -36,6 +36,8 @@ import io.swagger.v3.oas.annotations.Hidden; import javax.annotation.Nullable; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; /** The sub task execution attempt response. */ @@ -61,6 +63,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { public static final String FIELD_NAME_TASKMANAGER_ID = "taskmanager-id"; + public static final String FIELD_NAME_STATUS_DURATION = "status-duration"; + @JsonProperty(FIELD_NAME_SUBTASK_INDEX) private final int subtaskIndex; @@ -92,6 +96,9 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_TASKMANAGER_ID) private final String taskmanagerId; + @JsonProperty(FIELD_NAME_STATUS_DURATION) + private final Map<ExecutionState, Long> statusDuration; + @JsonCreator public SubtaskExecutionAttemptDetailsInfo( @JsonProperty(FIELD_NAME_SUBTASK_INDEX) int subtaskIndex, @@ -102,7 +109,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { @JsonProperty(FIELD_NAME_END_TIME) long endTime, @JsonProperty(FIELD_NAME_DURATION) long duration, @JsonProperty(FIELD_NAME_METRICS) IOMetricsInfo ioMetricsInfo, - @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId) { + @JsonProperty(FIELD_NAME_TASKMANAGER_ID) String taskmanagerId, + @JsonProperty(FIELD_NAME_STATUS_DURATION) Map<ExecutionState, Long> statusDuration) { this.subtaskIndex = subtaskIndex; this.status = Preconditions.checkNotNull(status); @@ -114,6 +122,7 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { this.duration = duration; this.ioMetricsInfo = Preconditions.checkNotNull(ioMetricsInfo); this.taskmanagerId = Preconditions.checkNotNull(taskmanagerId); + this.statusDuration = Preconditions.checkNotNull(statusDuration); } public int getSubtaskIndex() { @@ -148,6 +157,14 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { return duration; } + public Map<ExecutionState, Long> getStatusDuration() { + return statusDuration; + } + + public long getStatusDuration(ExecutionState state) { + return statusDuration.get(state); + } + public IOMetricsInfo getIoMetricsInfo() { return ioMetricsInfo; } @@ -176,7 +193,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { && endTime == that.endTime && duration == that.duration && Objects.equals(ioMetricsInfo, that.ioMetricsInfo) - && Objects.equals(taskmanagerId, that.taskmanagerId); + && Objects.equals(taskmanagerId, that.taskmanagerId) + && Objects.equals(statusDuration, that.statusDuration); } @Override @@ -191,7 +209,8 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { endTime, duration, ioMetricsInfo, - taskmanagerId); + taskmanagerId, + statusDuration); } public static SubtaskExecutionAttemptDetailsInfo create( @@ -240,6 +259,56 @@ public class SubtaskExecutionAttemptDetailsInfo implements ResponseBody { endTime, duration, ioMetricsInfo, - taskmanagerId); + taskmanagerId, + getExecutionStateDuration(execution)); + } + + private static Map<ExecutionState, Long> getExecutionStateDuration(AccessExecution execution) { + Map<ExecutionState, Long> executionStateDuration = new HashMap<>(); + long now = System.currentTimeMillis(); + ExecutionState state = execution.getState(); + executionStateDuration.put( + ExecutionState.CREATED, + calculateStateDuration( + execution.getStateTimestamp(ExecutionState.CREATED), + state == ExecutionState.CREATED + ? now + : execution.getStateEndTimestamp(ExecutionState.CREATED))); + executionStateDuration.put( + ExecutionState.SCHEDULED, + calculateStateDuration( + execution.getStateTimestamp(ExecutionState.SCHEDULED), + state == ExecutionState.SCHEDULED + ? now + : execution.getStateEndTimestamp(ExecutionState.SCHEDULED))); + executionStateDuration.put( + ExecutionState.DEPLOYING, + calculateStateDuration( + execution.getStateTimestamp(ExecutionState.DEPLOYING), + state == ExecutionState.DEPLOYING + ? now + : execution.getStateEndTimestamp(ExecutionState.DEPLOYING))); + executionStateDuration.put( + ExecutionState.INITIALIZING, + calculateStateDuration( + execution.getStateTimestamp(ExecutionState.INITIALIZING), + state == ExecutionState.INITIALIZING + ? now + : execution.getStateEndTimestamp(ExecutionState.INITIALIZING))); + executionStateDuration.put( + ExecutionState.RUNNING, + calculateStateDuration( + execution.getStateTimestamp(ExecutionState.RUNNING), + state == ExecutionState.RUNNING + ? now + : execution.getStateEndTimestamp(ExecutionState.RUNNING))); + return executionStateDuration; + } + + private static long calculateStateDuration(long start, long end) { + if (start == 0 || end == 0) { + return -1; + } + return end - start; } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 6819ef941c7..b6e9cefc2b4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -435,6 +435,9 @@ public class ArchivedExecutionGraphTest extends TestLogger { assertEquals(runtimeExecution.getAttemptNumber(), archivedExecution.getAttemptNumber()); assertArrayEquals( runtimeExecution.getStateTimestamps(), archivedExecution.getStateTimestamps()); + assertArrayEquals( + runtimeExecution.getStateEndTimestamps(), + archivedExecution.getStateEndTimestamps()); assertEquals(runtimeExecution.getState(), archivedExecution.getState()); assertEquals( runtimeExecution.getAssignedResourceLocation(), @@ -472,6 +475,33 @@ public class ArchivedExecutionGraphTest extends TestLogger { assertEquals( runtimeExecution.getStateTimestamp(ExecutionState.FAILED), archivedExecution.getStateTimestamp(ExecutionState.FAILED)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.CREATED), + archivedExecution.getStateEndTimestamp(ExecutionState.CREATED)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.SCHEDULED), + archivedExecution.getStateEndTimestamp(ExecutionState.SCHEDULED)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.DEPLOYING), + archivedExecution.getStateEndTimestamp(ExecutionState.DEPLOYING)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.INITIALIZING), + archivedExecution.getStateEndTimestamp(ExecutionState.INITIALIZING)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.RUNNING), + archivedExecution.getStateEndTimestamp(ExecutionState.RUNNING)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.FINISHED), + archivedExecution.getStateEndTimestamp(ExecutionState.FINISHED)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELING), + archivedExecution.getStateEndTimestamp(ExecutionState.CANCELING)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.CANCELED), + archivedExecution.getStateEndTimestamp(ExecutionState.CANCELED)); + assertEquals( + runtimeExecution.getStateEndTimestamp(ExecutionState.FAILED), + archivedExecution.getStateEndTimestamp(ExecutionState.FAILED)); compareStringifiedAccumulators( runtimeExecution.getUserAccumulatorsStringified(), archivedExecution.getUserAccumulatorsStringified()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java index c719fecf860..3f1c54b1238 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionHistoryTest.java @@ -80,6 +80,7 @@ class ExecutionHistoryTest { null, null, null, + new long[ExecutionState.values().length], new long[ExecutionState.values().length]); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java index b5eab9a20a2..93d3cf9c99d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandlerTest.java @@ -330,6 +330,7 @@ public class JobExceptionsHandlerTest extends TestLogger { final StringifiedAccumulatorResult[] emptyAccumulators = new StringifiedAccumulatorResult[0]; final long[] timestamps = new long[ExecutionState.values().length]; + final long[] endTimestamps = new long[ExecutionState.values().length]; final ExecutionState expectedState = ExecutionState.RUNNING; final LocalTaskManagerLocation assignedResourceLocation = new LocalTaskManagerLocation(); @@ -352,7 +353,8 @@ public class JobExceptionsHandlerTest extends TestLogger { System.currentTimeMillis()), assignedResourceLocation, allocationID, - timestamps), + timestamps, + endTimestamps), new ExecutionHistory(0)) }, jobVertexID, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java index 483ad2facf1..4c267329a74 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -50,6 +50,7 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.junit.Assert.assertEquals; @@ -87,7 +88,9 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { accumulateBackPressuredTime); final long[] timestamps = new long[ExecutionState.values().length]; + final long[] endTimestamps = new long[ExecutionState.values().length]; timestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs; + endTimestamps[ExecutionState.DEPLOYING.ordinal()] = deployingTs + 10; final ExecutionState expectedState = ExecutionState.FINISHED; timestamps[expectedState.ordinal()] = finishedTs; @@ -106,7 +109,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { null, assignedResourceLocation, allocationID, - timestamps); + timestamps, + endTimestamps); final ArchivedExecutionVertex executionVertex = new ArchivedExecutionVertex( @@ -170,6 +174,13 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { accumulateIdleTime, accumulateBusyTime); + final Map<ExecutionState, Long> statusDuration = new HashMap<>(); + statusDuration.put(ExecutionState.CREATED, -1L); + statusDuration.put(ExecutionState.SCHEDULED, -1L); + statusDuration.put(ExecutionState.DEPLOYING, 10L); + statusDuration.put(ExecutionState.INITIALIZING, -1L); + statusDuration.put(ExecutionState.RUNNING, -1L); + final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo( subtaskIndex, @@ -180,7 +191,8 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { finishedTs, finishedTs - deployingTs, ioMetricsInfo, - assignedResourceLocation.getResourceID().getResourceIdString()); + assignedResourceLocation.getResourceID().getResourceIdString(), + statusDuration); assertEquals(expectedDetailsInfo, detailsInfo); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index e5d98dbf8f4..bf1b9561e43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -99,6 +99,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandlerTest extends TestLogger { null, null, null, + new long[ExecutionState.values().length], new long[ExecutionState.values().length]); // Invoke tested method. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java index 4a288e738be..a044f3f87ff 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -52,6 +52,7 @@ import org.junit.Test; import java.util.Collections; import java.util.HashMap; +import java.util.Map; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.junit.Assert.assertEquals; @@ -107,6 +108,7 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { null, null, null, + new long[ExecutionState.values().length], new long[ExecutionState.values().length]), new ExecutionHistory(0)) }, @@ -175,6 +177,13 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { accumulateIdleTime, accumulateBusyTime); + final Map<ExecutionState, Long> statusDuration = new HashMap<>(); + statusDuration.put(ExecutionState.CREATED, -1L); + statusDuration.put(ExecutionState.SCHEDULED, -1L); + statusDuration.put(ExecutionState.DEPLOYING, -1L); + statusDuration.put(ExecutionState.INITIALIZING, -1L); + statusDuration.put(ExecutionState.RUNNING, -1L); + final SubtaskExecutionAttemptDetailsInfo expectedDetailsInfo = new SubtaskExecutionAttemptDetailsInfo( subtaskIndex, @@ -185,7 +194,8 @@ public class SubtaskExecutionAttemptDetailsHandlerTest extends TestLogger { 0L, -1L, ioMetricsInfo, - "(unassigned)"); + "(unassigned)", + statusDuration); assertEquals(expectedDetailsInfo, detailsInfo); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java index 806ce560494..50c0865f85c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexDetailsInfoTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.job.SubtaskExecutionAttemptDetails import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; @@ -62,7 +63,8 @@ public class JobVertexDetailsInfoTest System.currentTimeMillis(), 1L, jobVertexMetrics, - "taskmanagerId1")); + "taskmanagerId1", + Collections.singletonMap(ExecutionState.CREATED, 10L))); vertexTaskDetailList.add( new SubtaskExecutionAttemptDetailsInfo( 1, @@ -73,7 +75,8 @@ public class JobVertexDetailsInfoTest System.currentTimeMillis(), 1L, jobVertexMetrics, - "taskmanagerId2")); + "taskmanagerId2", + Collections.singletonMap(ExecutionState.CREATED, 10L))); vertexTaskDetailList.add( new SubtaskExecutionAttemptDetailsInfo( 2, @@ -84,7 +87,8 @@ public class JobVertexDetailsInfoTest System.currentTimeMillis(), 1L, jobVertexMetrics, - "taskmanagerId3")); + "taskmanagerId3", + Collections.singletonMap(ExecutionState.CREATED, 10L))); int parallelism = 1 + (random.nextInt() / 3); return new JobVertexDetailsInfo( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java index f022d6ced26..48158c05b56 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/job/SubtaskExecutionAttemptDetailsInfoTest.java @@ -22,6 +22,8 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.rest.messages.RestResponseMarshallingTestBase; import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo; +import java.util.HashMap; +import java.util.Map; import java.util.Random; /** Tests (un)marshalling of the {@link SubtaskExecutionAttemptDetailsInfo}. */ @@ -51,6 +53,13 @@ public class SubtaskExecutionAttemptDetailsInfoTest Math.abs(random.nextLong()), Math.abs(random.nextDouble())); + final Map<ExecutionState, Long> statusDuration = new HashMap<>(); + statusDuration.put(ExecutionState.CREATED, 10L); + statusDuration.put(ExecutionState.SCHEDULED, 20L); + statusDuration.put(ExecutionState.DEPLOYING, 30L); + statusDuration.put(ExecutionState.INITIALIZING, 40L); + statusDuration.put(ExecutionState.RUNNING, 50L); + return new SubtaskExecutionAttemptDetailsInfo( Math.abs(random.nextInt()), ExecutionState.values()[random.nextInt(ExecutionState.values().length)], @@ -60,6 +69,7 @@ public class SubtaskExecutionAttemptDetailsInfoTest Math.abs(random.nextLong()), Math.abs(random.nextLong()), ioMetricsInfo, - "taskmanagerId"); + "taskmanagerId", + statusDuration); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java index f3ec6fe2f1d..5173ba7c2d0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/exceptionhistory/TestingAccessExecution.java @@ -77,6 +77,11 @@ public class TestingAccessExecution implements AccessExecution { throw new UnsupportedOperationException("getStateTimestamps should not be called."); } + @Override + public long[] getStateEndTimestamps() { + throw new UnsupportedOperationException("getStateTimestamps should not be called."); + } + @Override public ExecutionState getState() { return state; @@ -87,6 +92,11 @@ public class TestingAccessExecution implements AccessExecution { throw new UnsupportedOperationException("getStateTimestamp should not be called."); } + @Override + public long getStateEndTimestamp(ExecutionState state) { + throw new UnsupportedOperationException("getStateTimestamp should not be called."); + } + @Override public StringifiedAccumulatorResult[] getUserAccumulatorsStringified() { throw new UnsupportedOperationException(
