Repository: flink
Updated Branches:
  refs/heads/master 0a286d0ff -> 9829ca00d


[FLINK-7704] [flip6] Add JobPlanHandler for new RestServerEndpoint

This closes #4768.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9829ca00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9829ca00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9829ca00

Branch: refs/heads/master
Commit: 9829ca00dff201879724847b498fe0432219cb53
Parents: 0a286d0
Author: yew1eb <[email protected]>
Authored: Wed Oct 4 01:07:49 2017 +0800
Committer: Till Rohrmann <[email protected]>
Committed: Tue Oct 10 18:44:06 2017 +0200

----------------------------------------------------------------------
 .../dispatcher/DispatcherRestEndpoint.java      |  11 ++
 .../rest/handler/job/JobPlanHandler.java        |  61 ++++++++++
 .../runtime/rest/messages/JobPlanHeaders.java   |  71 ++++++++++++
 .../runtime/rest/messages/JobPlanInfo.java      | 113 +++++++++++++++++++
 .../legacy/messages/JobPlanInfoTest.java        |  41 +++++++
 5 files changed, 297 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 2a2d9be..6297b41 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
 import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
 import org.apache.flink.runtime.rest.handler.job.BlobServerPortHandler;
 import org.apache.flink.runtime.rest.handler.job.JobConfigHandler;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
 import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
 import org.apache.flink.runtime.rest.handler.job.JobTerminationHandler;
 import 
org.apache.flink.runtime.rest.handler.job.checkpoints.CheckpointConfigHandler;
@@ -51,6 +52,7 @@ import 
org.apache.flink.runtime.rest.messages.CurrentJobsOverviewHandlerHeaders;
 import org.apache.flink.runtime.rest.messages.DashboardConfigurationHeaders;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.JobConfigHeaders;
+import org.apache.flink.runtime.rest.messages.JobPlanHeaders;
 import org.apache.flink.runtime.rest.messages.JobTerminationHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointConfigHeaders;
 import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatisticDetailsHeaders;
@@ -186,6 +188,14 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        executor,
                        checkpointStatsCache);
 
+               JobPlanHandler jobPlanHandler = new JobPlanHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       JobPlanHeaders.getInstance(),
+                       executionGraphCache,
+                       executor);
+
                final File tmpDir = restConfiguration.getTmpDir();
 
                Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
@@ -210,6 +220,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                handlers.add(Tuple2.of(CheckpointConfigHeaders.getInstance(), 
checkpointConfigHandler));
                
handlers.add(Tuple2.of(CheckpointingStatisticsHeaders.getInstance(), 
checkpointStatisticsHandler));
                
handlers.add(Tuple2.of(CheckpointStatisticDetailsHeaders.getInstance(), 
checkpointStatisticDetailsHandler));
+               handlers.add(Tuple2.of(JobPlanHeaders.getInstance(), 
jobPlanHandler));
 
                BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
                
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
new file mode 100644
index 0000000..c8e6f8b
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -0,0 +1,61 @@
+package org.apache.flink.runtime.rest.handler.job;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.JobMessageParameters;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+
+/**
+ * Handler serving the job execution plan.
+ */
+public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, 
JobMessageParameters> {
+
+       public JobPlanHandler(
+               CompletableFuture<String> localRestAddress,
+               GatewayRetriever<? extends RestfulGateway> leaderRetriever,
+               Time timeout,
+               MessageHeaders<EmptyRequestBody, JobPlanInfo, 
JobMessageParameters> messageHeaders,
+               ExecutionGraphCache executionGraphCache,
+               Executor executor) {
+
+               super(
+                       localRestAddress,
+                       leaderRetriever,
+                       timeout,
+                       messageHeaders,
+                       executionGraphCache,
+                       executor);
+       }
+
+       @Override
+       protected JobPlanInfo handleRequest(HandlerRequest<EmptyRequestBody, 
JobMessageParameters> request, AccessExecutionGraph executionGraph) throws 
RestHandlerException {
+               return new JobPlanInfo(executionGraph.getJsonPlan());
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
new file mode 100644
index 0000000..17204bb
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanHeaders.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.HttpMethodWrapper;
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+/**
+ * Message headers for the {@link JobPlanHandler}.
+ */
+public class JobPlanHeaders implements MessageHeaders<EmptyRequestBody, 
JobPlanInfo, JobMessageParameters> {
+
+       private static final JobPlanHeaders INSTANCE = new JobPlanHeaders();
+
+       public static final String URL = "/jobs/:jobid/plan";
+
+       private JobPlanHeaders() {
+       }
+
+       @Override
+       public Class<EmptyRequestBody> getRequestClass() {
+               return EmptyRequestBody.class;
+       }
+
+       @Override
+       public Class<JobPlanInfo> getResponseClass() {
+               return JobPlanInfo.class;
+       }
+
+       @Override
+       public HttpResponseStatus getResponseStatusCode() {
+               return HttpResponseStatus.OK;
+       }
+
+       @Override
+       public JobMessageParameters getUnresolvedMessageParameters() {
+               return new JobMessageParameters();
+       }
+
+       @Override
+       public HttpMethodWrapper getHttpMethod() {
+               return HttpMethodWrapper.GET;
+       }
+
+       @Override
+       public String getTargetRestEndpointURL() {
+               return URL;
+       }
+
+       public static JobPlanHeaders getInstance() {
+               return INSTANCE;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
new file mode 100644
index 0000000..3987723
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobPlanInfo.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.messages;
+
+import org.apache.flink.runtime.rest.handler.job.JobPlanHandler;
+import org.apache.flink.util.Preconditions;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.DeserializationContext;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import com.fasterxml.jackson.databind.deser.std.StdDeserializer;
+import com.fasterxml.jackson.databind.ser.std.StdSerializer;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * Response type of the {@link JobPlanHandler}.
+ */
+@JsonSerialize(using = JobPlanInfo.Serializer.class)
+@JsonDeserialize(using = JobPlanInfo.Deserializer.class)
+public class JobPlanInfo implements ResponseBody {
+
+       private final String jsonPlan;
+
+       public JobPlanInfo(String jsonPlan) {
+               this.jsonPlan = Preconditions.checkNotNull(jsonPlan);
+       }
+
+       public String getJsonPlan() {
+               return jsonPlan;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               JobPlanInfo that = (JobPlanInfo) o;
+               return Objects.equals(jsonPlan, that.jsonPlan);
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(jsonPlan);
+       }
+
+       
//---------------------------------------------------------------------------------
+       // Static helper classes
+       
//---------------------------------------------------------------------------------
+
+       /**
+        * Json serializer for the {@link JobPlanInfo}.
+        */
+       public static final class Serializer extends StdSerializer<JobPlanInfo> 
{
+
+               private static final long serialVersionUID = 
-1551666039618928811L;
+
+               public Serializer() {
+                       super(JobPlanInfo.class);
+               }
+
+               @Override
+               public void serialize(
+                       JobPlanInfo jobPlanInfo,
+                       JsonGenerator jsonGenerator,
+                       SerializerProvider serializerProvider) throws 
IOException {
+                       jsonGenerator.writeString(jobPlanInfo.getJsonPlan());
+               }
+       }
+
+       /**
+        * Json deserializer for the {@link JobPlanInfo}.
+        */
+       public static final class Deserializer extends 
StdDeserializer<JobPlanInfo> {
+
+               private static final long serialVersionUID = 
-3580088509877177213L;
+
+               public Deserializer() {
+                       super(JobPlanInfo.class);
+               }
+
+               @Override
+               public JobPlanInfo deserialize(
+                       JsonParser jsonParser,
+                       DeserializationContext deserializationContext) throws 
IOException {
+                       final String jsonPlan = jsonParser.getText();
+                       return new JobPlanInfo(jsonPlan);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9829ca00/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
new file mode 100644
index 0000000..1fe51d0
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/messages/JobPlanInfoTest.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.legacy.messages;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.rest.messages.JobPlanInfo;
+
+/**
+ * Tests that the {@link JobPlanInfo} can be marshalled and unmarshalled.
+ */
+public class JobPlanInfoTest extends 
RestResponseMarshallingTestBase<JobPlanInfo> {
+
+       @Override
+       protected Class<JobPlanInfo> getTestResponseClass() {
+               return JobPlanInfo.class;
+       }
+
+       @Override
+       protected JobPlanInfo getTestResponseInstance() {
+               JobID jobID = new JobID();
+               String jobName = "job_007";
+               String jsonPlan = "{\"jobid\":\"" + jobID + "\", \"name\":\"" + 
jobName + "\", \"nodes\":[]}";
+               return new JobPlanInfo(jsonPlan);
+       }
+}

Reply via email to