[FLINK-7941] Store timestamps indexed by ExecutionState in SubtasksTimesInfo


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/26e3d376
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/26e3d376
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/26e3d376

Branch: refs/heads/master
Commit: 26e3d3765a7bab2f4cb517476dfcb7e9c1b2ae30
Parents: 712d4cf
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Fri Nov 3 18:59:19 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Nov 7 15:07:45 2017 +0100

----------------------------------------------------------------------
 .../rest/handler/job/SubtasksTimesHandler.java  |  4 +--
 .../rest/messages/SubtasksTimesInfo.java        |  5 ++--
 .../rest/messages/SubtasksTimesInfoTest.java    | 26 +++++++++++---------
 3 files changed, 19 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
index feae3ab..bc72e51 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/SubtasksTimesHandler.java
@@ -89,9 +89,9 @@ public class SubtasksTimesHandler extends 
AbstractExecutionGraphHandler<Subtasks
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
                        String locationString = location == null ? 
"(unassigned)" : location.getHostname();
 
-                       Map<String, Long> timestampMap = new HashMap<>();
+                       Map<ExecutionState, Long> timestampMap = new 
HashMap<>(ExecutionState.values().length);
                        for (ExecutionState state : ExecutionState.values()) {
-                               timestampMap.put(state.name(), 
timestamps[state.ordinal()]);
+                               timestampMap.put(state, 
timestamps[state.ordinal()]);
                        }
 
                        subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(

http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
index d97a0d0..a0edf7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfo.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.rest.handler.job.SubtasksTimesHandler;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
@@ -109,13 +110,13 @@ public class SubtasksTimesInfo implements ResponseBody {
                private final long duration;
 
                @JsonProperty(FIELD_NAME_TIMESTAMPS)
-               private final Map<String, Long> timestamps;
+               private final Map<ExecutionState, Long> timestamps;
 
                public SubtaskTimeInfo(
                                @JsonProperty(FIELD_NAME_SUBTASK) int subtask,
                                @JsonProperty(FIELD_NAME_HOST) String host,
                                @JsonProperty(FIELD_NAME_DURATION) long 
duration,
-                               @JsonProperty(FIELD_NAME_TIMESTAMPS) 
Map<String, Long> timestamps) {
+                               @JsonProperty(FIELD_NAME_TIMESTAMPS) 
Map<ExecutionState, Long> timestamps) {
                        this.subtask = subtask;
                        this.host = checkNotNull(host);
                        this.duration = duration;

http://git-wip-us.apache.org/repos/asf/flink/blob/26e3d376/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
index 82eb21b..cbe5409 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/SubtasksTimesInfoTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rest.messages;
 
+import org.apache.flink.runtime.execution.ExecutionState;
+
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -37,22 +39,22 @@ public class SubtasksTimesInfoTest extends 
RestResponseMarshallingTestBase<Subta
        protected SubtasksTimesInfo getTestResponseInstance() throws Exception {
                List<SubtasksTimesInfo.SubtaskTimeInfo> subtasks = new 
ArrayList<>();
 
-               Map<String, Long> subTimeMap1 = new HashMap<>();
-               subTimeMap1.put("state11", System.currentTimeMillis());
-               subTimeMap1.put("state12", System.currentTimeMillis());
-               subTimeMap1.put("state13", System.currentTimeMillis());
+               Map<ExecutionState, Long> subTimeMap1 = new HashMap<>();
+               subTimeMap1.put(ExecutionState.RUNNING, 1L);
+               subTimeMap1.put(ExecutionState.FAILED, 2L);
+               subTimeMap1.put(ExecutionState.CANCELED, 3L);
                subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(0, "local1", 
1L, subTimeMap1));
 
-               Map<String, Long> subTimeMap2 = new HashMap<>();
-               subTimeMap1.put("state21", System.currentTimeMillis());
-               subTimeMap1.put("state22", System.currentTimeMillis());
-               subTimeMap1.put("state23", System.currentTimeMillis());
+               Map<ExecutionState, Long> subTimeMap2 = new HashMap<>();
+               subTimeMap2.put(ExecutionState.RUNNING, 4L);
+               subTimeMap2.put(ExecutionState.FAILED, 5L);
+               subTimeMap2.put(ExecutionState.CANCELED, 6L);
                subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(1, "local2", 
2L, subTimeMap2));
 
-               Map<String, Long> subTimeMap3 = new HashMap<>();
-               subTimeMap1.put("state31", System.currentTimeMillis());
-               subTimeMap1.put("state32", System.currentTimeMillis());
-               subTimeMap1.put("state33", System.currentTimeMillis());
+               Map<ExecutionState, Long> subTimeMap3 = new HashMap<>();
+               subTimeMap3.put(ExecutionState.SCHEDULED, 1L);
+               subTimeMap3.put(ExecutionState.FAILED, 2L);
+               subTimeMap3.put(ExecutionState.CANCELING, 3L);
                subtasks.add(new SubtasksTimesInfo.SubtaskTimeInfo(2, "local3", 
3L, subTimeMap3));
 
                return new SubtasksTimesInfo("testId", "testName", 
System.currentTimeMillis(), subtasks);

Reply via email to