[FLINK-7858][flip6] Return with HTTP 404 if job or jobvertex are unknown

Annotate AccessExecutionGraph#getJobVertex(JobVertexID) with @Nullable.
Throw NotFoundException in JobVertexTaskManagersHandler if jobvertexId is 
unknown.
Throw NotFoundException in AbstractExecutionGraphHandler if jobId is unknown.
Copy Javadoc from legacy JobVertexTaskManagersHandler.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/37b4e2ce
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/37b4e2ce
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/37b4e2ce

Branch: refs/heads/master
Commit: 37b4e2cef687160f2bc7cedb7d2360825089569e
Parents: 056c72a
Author: gyao <[email protected]>
Authored: Wed Jan 24 12:24:35 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Thu Jan 25 15:55:54 2018 +0100

----------------------------------------------------------------------
 .../executiongraph/AccessExecutionGraph.java    |  3 +-
 .../flink/runtime/rest/NotFoundException.java   |  4 ++
 .../job/AbstractExecutionGraphHandler.java      | 15 ++++++-
 .../job/JobVertexTaskManagersHandler.java       | 41 +++++++++++++-------
 .../messages/JobVertexTaskManagersHeaders.java  |  2 +
 .../messages/JobVertexTaskManagersInfo.java     | 12 +++---
 6 files changed, 54 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
index 362afa1..8d1fa1d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/AccessExecutionGraph.java
@@ -76,8 +76,9 @@ public interface AccessExecutionGraph {
         * Returns the job vertex for the given {@link JobVertexID}.
         *
         * @param id id of job vertex to be returned
-        * @return job vertex for the given id, or null
+        * @return job vertex for the given id, or {@code null}
         */
+       @Nullable
        AccessExecutionJobVertex getJobVertex(JobVertexID id);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
index 50060b0..f9db334 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/NotFoundException.java
@@ -33,4 +33,8 @@ public class NotFoundException extends RestHandlerException {
        public NotFoundException(String message) {
                super(message, HttpResponseStatus.NOT_FOUND);
        }
+
+       public NotFoundException(String message, Throwable cause) {
+               super(message, HttpResponseStatus.NOT_FOUND, cause);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 7192832..7c42af1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -21,6 +21,8 @@ package org.apache.flink.runtime.rest.handler.job;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
@@ -32,6 +34,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.ResponseBody;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
@@ -79,8 +82,16 @@ public abstract class AbstractExecutionGraphHandler<R 
extends ResponseBody, M ex
                                } catch (RestHandlerException rhe) {
                                        throw new CompletionException(rhe);
                                }
-                       },
-                       executor);
+                       }, executor)
+                       .exceptionally(throwable -> {
+                               throwable = 
ExceptionUtils.stripCompletionException(throwable);
+                               if (throwable instanceof 
FlinkJobNotFoundException) {
+                                       throw new CompletionException(
+                                               new 
NotFoundException(String.format("Job %s not found", jobId), throwable));
+                               } else {
+                                       throw new 
CompletionException(throwable);
+                               }
+                       });
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
index 9b59e8d..24650a3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexTaskManagersHandler.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.AccessExecutionVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.rest.NotFoundException;
 import org.apache.flink.runtime.rest.handler.HandlerRequest;
 import org.apache.flink.runtime.rest.handler.RestHandlerException;
 import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
@@ -41,6 +42,7 @@ import 
org.apache.flink.runtime.rest.messages.job.metrics.IOMetricsInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -50,7 +52,8 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 /**
- * Request handler for the job vertex task managers.
+ * A request handler that provides the details of a job vertex, including id, 
name, and the
+ * runtime and metrics of all its subtasks aggregated by TaskManager.
  */
 public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<JobVertexTaskManagersInfo, 
JobVertexMessageParameters> {
        private MetricFetcher<?> metricFetcher;
@@ -65,7 +68,7 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                        Executor executor,
                        MetricFetcher<?> metricFetcher) {
                super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders, executionGraphCache, executor);
-               this.metricFetcher = metricFetcher;
+               this.metricFetcher = Preconditions.checkNotNull(metricFetcher);
        }
 
        @Override
@@ -76,23 +79,24 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                JobVertexID jobVertexID = 
request.getPathParameter(JobVertexIdPathParameter.class);
                AccessExecutionJobVertex jobVertex = 
executionGraph.getJobVertex(jobVertexID);
 
+               if (jobVertex == null) {
+                       throw new NotFoundException(String.format("JobVertex %s 
not found", jobVertexID));
+               }
+
                // Build a map that groups tasks by TaskManager
                Map<String, List<AccessExecutionVertex>> taskManagerVertices = 
new HashMap<>();
                for (AccessExecutionVertex vertex : 
jobVertex.getTaskVertices()) {
                        TaskManagerLocation location = 
vertex.getCurrentAssignedResourceLocation();
-                       String taskManager = location == null ? "(unassigned)" 
: location.getHostname() + ":" + location.dataPort();
-                       List<AccessExecutionVertex> vertices = 
taskManagerVertices.get(taskManager);
-                       if (vertices == null) {
-                               vertices = new ArrayList<>();
-                               taskManagerVertices.put(taskManager, vertices);
-                       }
-
+                       String taskManager = location == null ? "(unassigned)" 
: location.getHostname() + ':' + location.dataPort();
+                       List<AccessExecutionVertex> vertices = 
taskManagerVertices.computeIfAbsent(
+                               taskManager,
+                               ignored -> new ArrayList<>(4));
                        vertices.add(vertex);
                }
 
                final long now = System.currentTimeMillis();
 
-               List<JobVertexTaskManagersInfo.TaskManagersInfo> 
taskManagersInfoList = new ArrayList<>();
+               List<JobVertexTaskManagersInfo.TaskManagersInfo> 
taskManagersInfoList = new ArrayList<>(4);
                for (Map.Entry<String, List<AccessExecutionVertex>> entry : 
taskManagerVertices.entrySet()) {
                        String host = entry.getKey();
                        List<AccessExecutionVertex> taskVertices = 
entry.getValue();
@@ -141,8 +145,10 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                                duration = -1L;
                        }
 
-                       ExecutionState jobVertexState =
-                               
ExecutionJobVertex.getAggregateJobVertexState(tasksPerState, 
taskVertices.size());
+                       ExecutionState jobVertexState = 
ExecutionJobVertex.getAggregateJobVertexState(
+                               tasksPerState,
+                               taskVertices.size());
+
                        final IOMetricsInfo jobVertexMetrics = new 
IOMetricsInfo(
                                counts.getNumBytesInLocal() + 
counts.getNumBytesInRemote(),
                                counts.isNumBytesInLocalComplete() && 
counts.isNumBytesInRemoteComplete(),
@@ -153,11 +159,18 @@ public class JobVertexTaskManagersHandler extends 
AbstractExecutionGraphHandler<
                                counts.getNumRecordsOut(),
                                counts.isNumRecordsOutComplete());
 
-                       Map<ExecutionState, Integer> statusCounts = new 
HashMap<>();
+                       Map<ExecutionState, Integer> statusCounts = new 
HashMap<>(ExecutionState.values().length);
                        for (ExecutionState state : ExecutionState.values()) {
                                statusCounts.put(state, 
tasksPerState[state.ordinal()]);
                        }
-                       taskManagersInfoList.add(new 
JobVertexTaskManagersInfo.TaskManagersInfo(host, jobVertexState, startTime, 
endTime, duration, jobVertexMetrics, statusCounts));
+                       taskManagersInfoList.add(new 
JobVertexTaskManagersInfo.TaskManagersInfo(
+                               host,
+                               jobVertexState,
+                               startTime,
+                               endTime,
+                               duration,
+                               jobVertexMetrics,
+                               statusCounts));
                }
 
                return new JobVertexTaskManagersInfo(jobVertexID, 
jobVertex.getName(), now, taskManagersInfoList);

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
index 311d047..8424095 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersHeaders.java
@@ -36,6 +36,8 @@ public class JobVertexTaskManagersHeaders implements 
MessageHeaders<EmptyRequest
                "/:" + JobVertexIdPathParameter.KEY +
                "/taskmanagers";
 
+       private JobVertexTaskManagersHeaders() {}
+
        @Override
        public Class<EmptyRequestBody> getRequestClass() {
                return EmptyRequestBody.class;

http://git-wip-us.apache.org/repos/asf/flink/blob/37b4e2ce/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
index fc30155..75ff570 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobVertexTaskManagersInfo.java
@@ -30,7 +30,7 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonPro
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonDeserialize;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.annotation.JsonSerialize;
 
-import java.util.List;
+import java.util.Collection;
 import java.util.Map;
 import java.util.Objects;
 
@@ -56,18 +56,18 @@ public class JobVertexTaskManagersInfo implements 
ResponseBody {
        private final long now;
 
        @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS)
-       private List<TaskManagersInfo> taskManagers;
+       private Collection<TaskManagersInfo> taskManagerInfos;
 
        @JsonCreator
        public JobVertexTaskManagersInfo(
                        @JsonDeserialize(using = JobVertexIDDeserializer.class) 
@JsonProperty(VERTEX_TASK_FIELD_ID) JobVertexID jobVertexID,
                        @JsonProperty(VERTEX_TASK_FIELD_NAME) String name,
                        @JsonProperty(VERTEX_TASK_FIELD_NOW) long now,
-                       @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) 
List<TaskManagersInfo> taskManagers) {
+                       @JsonProperty(VERTEX_TASK_FIELD_TASK_MANAGERS) 
Collection<TaskManagersInfo> taskManagerInfos) {
                this.jobVertexID = checkNotNull(jobVertexID);
                this.name = checkNotNull(name);
                this.now = now;
-               this.taskManagers = checkNotNull(taskManagers);
+               this.taskManagerInfos = checkNotNull(taskManagerInfos);
        }
 
        @Override
@@ -82,12 +82,12 @@ public class JobVertexTaskManagersInfo implements 
ResponseBody {
                return Objects.equals(jobVertexID, that.jobVertexID) &&
                        Objects.equals(name, that.name) &&
                        now == that.now &&
-                       Objects.equals(taskManagers, that.taskManagers);
+                       Objects.equals(taskManagerInfos, that.taskManagerInfos);
        }
 
        @Override
        public int hashCode() {
-               return Objects.hash(jobVertexID, name, now, taskManagers);
+               return Objects.hash(jobVertexID, name, now, taskManagerInfos);
        }
 
        // ---------------------------------------------------

Reply via email to