[FLINK-7858][flip6] Port JobVertexTaskManagersHandler to REST endpoint

This closes #5149.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/056c72af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/056c72af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/056c72af

Branch: refs/heads/master
Commit: 056c72af994bc0b7bd838faff6b2991763fc2ac1
Parents: 5757942
Author: zjureel <[email protected]>
Authored: Tue Dec 19 16:56:50 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jan 25 15:33:30 2018 +0100

----------------------------------------------------------------------
 .../job/JobVertexTaskManagersHandler.java       | 165 ++++++++++++++++++
 .../messages/JobVertexTaskManagersHeaders.java  |  72 ++++++++
 .../messages/JobVertexTaskManagersInfo.java     | 171 +++++++++++++++++++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  13 ++
 .../messages/JobVertexTaskManagersInfoTest.java |  65 +++++++
 5 files changed, 486 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
new file mode 100644
index 0000000..9b59e8d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
+import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.util.MutableIOMetrics;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
+import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Request handler for the job vertex task managers.
+ */
+public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, 
JobVertexMessageParameters> {
+       private MetricFetcher<?> metricFetcher;
+
+       public JobVertexTaskManagersHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+                       Time timeout,
+                       Map<String, String> responseHeaders,
+                       MessageHeaders<EmptyRequestBody, 
JobVertexTaskManagersInfo, JobVertexMessageParameters> messageHeaders,
+                       ExecutionGraphCache executionGraphCache,
+                       Executor executor,
+                       MetricFetcher<?> metricFetcher) {
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders, executionGraphCache, executor);
+               this.metricFetcher = metricFetcher;
+       }
+
+       @Override
+       protected JobVertexTaskManagersInfo handleRequest(
+                       HandlerRequest<EmptyRequestBody, 
JobVertexMessageParameters> request,
+                       AccessExecutionGraph executionGraph) throws 
RestHandlerException {
+               JobID jobID = 
request.getPathParameter(JobIDPathParameter.class);
+               JobVertexID jobVertexID = 
request.getPathParameter(JobVertexIdPathParameter.class);
+               AccessExecutionJobVertex jobVertex = 
executionGraph.getJobVertex(jobVertexID);
+
+               // Build a map that groups tasks by TaskManager
+               Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
+               for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
+                       TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
+                       String taskManager = location == null ? "(unassigned)" 
: location.getHostname() + ":" + location.dataPort();
+                       List<AccessExecutionVertex> vertices = 
taskManagerVertices.get(taskManager);
+                       if (vertices == null) {
+                               vertices = new ArrayList<>();
+                               taskManagerVertices.put(taskManager, vertices);
+                       }
+
+                       vertices.add(vertex);
+               }
+
+               final long now = System.currentTimeMillis();
+
+               List<JobVertexTaskManagersInfo.TaskManagersInfo> 
taskManagersInfoList = new ArrayList<>();
+               for (Map.Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
+                       String host = entry.getKey();
+                       List<AccessExecutionVertex> taskVertices = 
entry.getValue();
+
+                       int[] tasksPerState = new 
int[ExecutionState.values().length];
+
+                       long startTime = Long.MAX_VALUE;
+                       long endTime = 0;
+                       boolean allFinished = true;
+
+                       MutableIOMetrics counts = new MutableIOMetrics();
+
+                       for (AccessExecutionVertex vertex : taskVertices) {
+                               final ExecutionState state = 
vertex.getExecutionState();
+                               tasksPerState[state.ordinal()]++;
+
+                               // take the earliest start time
+                               long started = 
vertex.getStateTimestamp(ExecutionState.DEPLOYING);
+                               if (started > 0) {
+                                       startTime = Math.min(startTime, 
started);
+                               }
+
+                               allFinished &= state.isTerminal();
+                               endTime = Math.max(endTime, 
vertex.getStateTimestamp(state));
+
+                               counts.addIOMetrics(
+                                       vertex.getCurrentExecutionAttempt(),
+                                       metricFetcher,
+                                       jobID.toString(),
+                                       jobVertex.getJobVertexId().toString());
+                       }
+
+                       long duration;
+                       if (startTime < Long.MAX_VALUE) {
+                               if (allFinished) {
+                                       duration = endTime - startTime;
+                               }
+                               else {
+                                       endTime = -1L;
+                                       duration = now - startTime;
+                               }
+                       }
+                       else {
+                               startTime = -1L;
+                               endTime = -1L;
+                               duration = -1L;
+                       }
+
+                       ExecutionState jobVertexState =
+                               
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
taskVertices.size());
+                       final IOMetricsInfo jobVertexMetrics = new 
IOMetricsInfo(
+                               counts.getNumBytesInLocal() + 
counts.getNumBytesInRemote(),
+                               counts.isNumBytesInLocalComplete() && 
counts.isNumBytesInRemoteComplete(),
+                               counts.getNumBytesOut(),
+                               counts.isNumBytesOutComplete(),
+                               counts.getNumRecordsIn(),
+                               counts.isNumRecordsInComplete(),
+                               counts.getNumRecordsOut(),
+                               counts.isNumRecordsOutComplete());
+
+                       Map<ExecutionState, Integer> statusCounts = new 
HashMap<>();
+                       for (ExecutionState state : ExecutionState.values()) {
+                               statusCounts.put(state, 
tasksPerState[state.ordinal()]);
+                       }
+                       taskManagersInfoList.add(new 
JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, 
endTime, duration, jobVertexMetrics, statusCounts));
+               }
+
+               return new JobVertexTaskManagersInfo(jobVertexID, 
jobVertex.getName(), now, taskManagersInfoList);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
new file mode 100644
index 0000000..311d047
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobVertexTaskManagersHandler}.
+ */
+public class JobVertexTaskManagersHeaders implements 
MessageHeaders<EmptyRequestBody, JobVertexTaskManagersInfo, 
JobVertexMessageParameters> {
+
+       private static final JobVertexTaskManagersHeaders INSTANCE = new 
JobVertexTaskManagersHeaders();
+
+       public static final String URL = "/jobs" +
+               "/:" + JobIDPathParameter.KEY +
+               "/vertices" +
+               "/:" + JobVertexIdPathParameter.KEY +
+               "/taskmanagers";
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<JobVertexTaskManagersInfo> getResponseClass() {
+               return JobVertexTaskManagersInfo.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public JobVertexMessageParameters getUnresolvedMessageParameters() {
+               return new JobVertexMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static JobVertexTaskManagersHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
new file mode 100644
index 0000000..fc30155
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDDeserializer;
+import org.apache.flink.runtime.rest.messages.json.JobVertexIDSerializer;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Response type of the {@link JobVertexTaskManagersHandler}.
+ */
+public class JobVertexTaskManagersInfo implements ResponseBody {
+       public static final String VERTEX_TASK_FIELD_ID = "id";
+       public static final String VERTEX_TASK_FIELD_NAME = "name";
+       public static final String VERTEX_TASK_FIELD_NOW = "now";
+       public static final String VERTEX_TASK_FIELD_TASK_MANAGERS = 
"taskmanagers";
+
+       @JsonProperty(VERTEX_TASK_FIELD_ID)
+       @JsonSerialize(using = JobVertexIDSerializer.class)
+       private final JobVertexID jobVertexID;
+
+       @JsonProperty(VERTEX_TASK_FIELD_NAME)
+       private final String name;
+
+       @JsonProperty(VERTEX_TASK_FIELD_NOW)
+       private final long now;
+
+       @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
+       private List<TaskManagersInfo> taskManagers;
+
+       @JsonCreator
+       public JobVertexTaskManagersInfo(
+                       @JsonDeserialize(using = JobVertexIDDeserializer.class) 
@JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
+                       @JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
+                       @JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
+                       @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) 
List<TaskManagersInfo> taskManagers) {
+               this.jobVertexID = checkNotNull(jobVertexID);
+               this.name = checkNotNull(name);
+               this.now = now;
+               this.taskManagers = checkNotNull(taskManagers);
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               JobVertexTaskManagersInfo that = (JobVertexTaskManagersInfo) o;
+               return Objects.equals(jobVertexID, that.jobVertexID) &&
+                       Objects.equals(name, that.name) &&
+                       now == that.now &&
+                       Objects.equals(taskManagers, that.taskManagers);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(jobVertexID, name, now, taskManagers);
+       }
+
+       // ---------------------------------------------------
+       // Static inner classes
+       // ---------------------------------------------------
+
+       /**
+        * Detailed information about task managers.
+        */
+       public static class TaskManagersInfo {
+               public static final String TASK_MANAGERS_FIELD_HOST = "host";
+               public static final String TASK_MANAGERS_FIELD_STATUS = 
"status";
+               public static final String TASK_MANAGERS_FIELD_START_TIME = 
"start-time";
+               public static final String TASK_MANAGERS_FIELD_END_TIME = 
"end-time";
+               public static final String TASK_MANAGERS_FIELD_DURATION = 
"duration";
+               public static final String TASK_MANAGERS_FIELD_METRICS = 
"metrics";
+               public static final String TASK_MANAGERS_FIELD_STATUS_COUNTS = 
"status-counts";
+
+               @JsonProperty(TASK_MANAGERS_FIELD_HOST)
+               private final String host;
+
+               @JsonProperty(TASK_MANAGERS_FIELD_STATUS)
+               private final ExecutionState status;
+
+               @JsonProperty(TASK_MANAGERS_FIELD_START_TIME)
+               private final long startTime;
+
+               @JsonProperty(TASK_MANAGERS_FIELD_END_TIME)
+               private final long endTime;
+
+               @JsonProperty(TASK_MANAGERS_FIELD_DURATION)
+               private final long duration;
+
+               @JsonProperty(TASK_MANAGERS_FIELD_METRICS)
+               private final IOMetricsInfo metrics;
+
+               @JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS)
+               private final Map<ExecutionState, Integer> statusCounts;
+
+               @JsonCreator
+               public TaskManagersInfo(
+                               @JsonProperty(TASK_MANAGERS_FIELD_HOST) String 
host,
+                               @JsonProperty(TASK_MANAGERS_FIELD_STATUS) 
ExecutionState status,
+                               @JsonProperty(TASK_MANAGERS_FIELD_START_TIME) 
long startTime,
+                               @JsonProperty(TASK_MANAGERS_FIELD_END_TIME) 
long endTime,
+                               @JsonProperty(TASK_MANAGERS_FIELD_DURATION) 
long duration,
+                               @JsonProperty(TASK_MANAGERS_FIELD_METRICS) 
IOMetricsInfo metrics,
+                               
@JsonProperty(TASK_MANAGERS_FIELD_STATUS_COUNTS) Map<ExecutionState, Integer> 
statusCounts) {
+                       this.host = checkNotNull(host);
+                       this.status = checkNotNull(status);
+                       this.startTime = startTime;
+                       this.endTime = endTime;
+                       this.duration = duration;
+                       this.metrics = checkNotNull(metrics);
+                       this.statusCounts = checkNotNull(statusCounts);
+               }
+
+               @Override
+               public boolean equals(Object o) {
+                       if (this == o) {
+                               return true;
+                       }
+                       if (o == null || getClass() != o.getClass()) {
+                               return false;
+                       }
+                       TaskManagersInfo that = (TaskManagersInfo) o;
+                       return Objects.equals(host, that.host) &&
+                               Objects.equals(status, that.status) &&
+                               startTime == that.startTime &&
+                               endTime == that.endTime &&
+                               duration == that.duration &&
+                               Objects.equals(metrics, that.metrics) &&
+                               Objects.equals(statusCounts, that.statusCounts);
+               }
+
+               @Override
+               public int hashCode() {
+                       return Objects.hash(host, status, startTime, endTime, 
duration, metrics, statusCounts);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index e432752..30a68d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.rest.handler.job.JobExecutionResultHandler;
 import org.apache.flink.runtime.rest.handler.job.JobIdsHandler;
 import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobVertexAccumulatorsHandler;
+import org.apache.flink.runtime.rest.handler.job.JobVertexTaskManagersHandler;
 import org.apache.flink.runtime.rest.handler.job.JobsOverviewHandler;
 import 
org.apache.flink.runtime.rest.handler.job.SubtaskCurrentAttemptDetailsHandler;
 import 
org.apache.flink.runtime.rest.handler.job.SubtaskExecutionAttemptAccumulatorsHandler;
@@ -69,6 +70,7 @@ import 
org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
 import 
org.apache.flink.runtime.rest.messages.JobIdsWithStatusesOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexAccumulatorsHeaders;
+import org.apache.flink.runtime.rest.messages.JobVertexTaskManagersHeaders;
 import org.apache.flink.runtime.rest.messages.JobsOverviewHeaders;
 import org.apache.flink.runtime.rest.messages.SubtasksTimesHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
@@ -350,6 +352,16 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                        responseHeaders,
                        metricFetcher);
 
+               final JobVertexTaskManagersHandler jobVertexTaskManagersHandler 
= new JobVertexTaskManagersHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       JobVertexTaskManagersHeaders.getInstance(),
+                       executionGraphCache,
+                       executor,
+                       metricFetcher);
+
                final JobExecutionResultHandler jobExecutionResultHandler = new 
JobExecutionResultHandler(
                        restAddressFuture,
                        leaderRetriever,
@@ -446,6 +458,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> 
extends RestServerEndp
                
handlers.add(Tuple2.of(SubtaskExecutionAttemptDetailsHeaders.getInstance(), 
subtaskExecutionAttemptDetailsHandler));
                
handlers.add(Tuple2.of(SubtaskExecutionAttemptAccumulatorsHeaders.getInstance(),
 subtaskExecutionAttemptAccumulatorsHandler));
                
handlers.add(Tuple2.of(SubtaskCurrentAttemptDetailsHeaders.getInstance(), 
subtaskCurrentAttemptDetailsHandler));
+               
handlers.add(Tuple2.of(JobVertexTaskManagersHeaders.getInstance(), 
jobVertexTaskManagersHandler));
 
                // This handler MUST be added last, as it otherwise masks all 
subsequent GET handlers
                optWebContent.ifPresent(

http://git-wip-us.apache.org/repos/asf/flink/blob/056c72af/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
new file mode 100644
index 0000000..1a7b521
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfoTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import static 
org.apache.flink.runtime.rest.messages.JobVertexTaskManagersInfo.TaskManagersInfo;
+
+/**
+ * Tests that the {@link JobVertexTaskManagersInfo} can be marshalled and 
unmarshalled.
+ */
+public class JobVertexTaskManagersInfoTest extends 
RestResponseMarshallingTestBase<JobVertexTaskManagersInfo> {
+       @Override
+       protected Class<JobVertexTaskManagersInfo> getTestResponseClass() {
+               return JobVertexTaskManagersInfo.class;
+       }
+
+       @Override
+       protected JobVertexTaskManagersInfo getTestResponseInstance() throws 
Exception {
+               final Random random = new Random();
+               List<TaskManagersInfo> taskManagersInfoList = new ArrayList<>();
+
+               final Map<ExecutionState, Integer> statusCounts = new 
HashMap<>(ExecutionState.values().length);
+               final IOMetricsInfo jobVertexMetrics = new IOMetricsInfo(
+                       random.nextLong(),
+                       random.nextBoolean(),
+                       random.nextLong(),
+                       random.nextBoolean(),
+                       random.nextLong(),
+                       random.nextBoolean(),
+                       random.nextLong(),
+                       random.nextBoolean());
+               int count = 100;
+               for (ExecutionState executionState : ExecutionState.values()) {
+                       statusCounts.put(executionState, count++);
+               }
+               taskManagersInfoList.add(new TaskManagersInfo("host1", 
ExecutionState.CANCELING, 1L, 2L, 3L, jobVertexMetrics, statusCounts));
+
+               return new JobVertexTaskManagersInfo(new JobVertexID(), "test", 
System.currentTimeMillis(), taskManagersInfoList);
+       }
+}

Reply via email to