[FLINK-7941] Store timestamps indexed by ExecutionState in SubtasksTimesInfo
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26e3d376 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26e3d376 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26e3d376 Branch: refs/heads/master Commit: 26e3d3765a7bab2f4cb517476dfcb7e9c1b2ae30 Parents: 712d4cf Author: Till Rohrmann <trohrm...@apache.org> Authored: Fri Nov 3 18:59:19 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Nov 7 15:07:45 2017 +0100 ---------------------------------------------------------------------- .../rest/handler/job/SubtasksTimesHandler.java | 4 +-- .../rest/messages/SubtasksTimesInfo.java | 5 ++-- .../rest/messages/SubtasksTimesInfoTest.java | 26 +++++++++++--------- 3 files changed, 19 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java index feae3ab..bc72e51 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java @@ -89,9 +89,9 @@ public class SubtasksTimesHandler extends AbstractExecutionGraphHandler<Subtasks TaskManagerLocation location = vertex.getCurrentAssignedResourceLocation(); String locationString = location == null ? "(unassigned)" : location.getHostname(); - Map<String, Long> timestampMap = new HashMap<>(); + Map<ExecutionState, Long> timestampMap = new HashMap<>(ExecutionState.values().length); for (ExecutionState state : ExecutionState.values()) { - timestampMap.put(state.name(), timestamps[state.ordinal()]); + timestampMap.put(state, timestamps[state.ordinal()]); } subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo( http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java index d97a0d0..a0edf7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rest.messages; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; @@ -109,13 +110,13 @@ public class SubtasksTimesInfo implements ResponseBody { private final long duration; @JsonProperty(FIELD_NAME_TIMESTAMPS) - private final Map<String, Long> timestamps; + private final Map<ExecutionState, Long> timestamps; public SubtaskTimeInfo( @JsonProperty(FIELD_NAME_SUBTASK) int subtask, @JsonProperty(FIELD_NAME_HOST) String host, @JsonProperty(FIELD_NAME_DURATION) long duration, - @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<String, Long> timestamps) { + @JsonProperty(FIELD_NAME_TIMESTAMPS) Map<ExecutionState, Long> timestamps) { this.subtask = subtask; this.host = checkNotNull(host); this.duration = duration; http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java index 82eb21b..cbe5409 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.rest.messages; +import org.apache.flink.runtime.execution.ExecutionState; + import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -37,22 +39,22 @@ public class SubtasksTimesInfoTest extends RestResponseMarshallingTestBase<Subta protected SubtasksTimesInfo getTestResponseInstance() throws Exception { List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new ArrayList<>(); - Map<String, Long> subTimeMap1 = new HashMap<>(); - subTimeMap1.put("state11", System.currentTimeMillis()); - subTimeMap1.put("state12", System.currentTimeMillis()); - subTimeMap1.put("state13", System.currentTimeMillis()); + Map<ExecutionState, Long> subTimeMap1 = new HashMap<>(); + subTimeMap1.put(ExecutionState.RUNNING, 1L); + subTimeMap1.put(ExecutionState.FAILED, 2L); + subTimeMap1.put(ExecutionState.CANCELED, 3L); subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 1L, subTimeMap1)); - Map<String, Long> subTimeMap2 = new HashMap<>(); - subTimeMap1.put("state21", System.currentTimeMillis()); - subTimeMap1.put("state22", System.currentTimeMillis()); - subTimeMap1.put("state23", System.currentTimeMillis()); + Map<ExecutionState, Long> subTimeMap2 = new HashMap<>(); + subTimeMap2.put(ExecutionState.RUNNING, 4L); + subTimeMap2.put(ExecutionState.FAILED, 5L); + subTimeMap2.put(ExecutionState.CANCELED, 6L); subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 2L, subTimeMap2)); - Map<String, Long> subTimeMap3 = new HashMap<>(); - subTimeMap1.put("state31", System.currentTimeMillis()); - subTimeMap1.put("state32", System.currentTimeMillis()); - subTimeMap1.put("state33", System.currentTimeMillis()); + Map<ExecutionState, Long> subTimeMap3 = new HashMap<>(); + subTimeMap3.put(ExecutionState.SCHEDULED, 1L); + subTimeMap3.put(ExecutionState.FAILED, 2L); + subTimeMap3.put(ExecutionState.CANCELING, 3L); subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 3L, subTimeMap3)); return new SubtasksTimesInfo("testId", "testName", System.currentTimeMillis(), subtasks);