[FLINK-8633] [flip6] Expose rescaling of jobs via the Dispatcher

This commit exposes the JobMaster#rescaleJob via the Dispatcher. This will
allow it to call this functionality from a REST handler.

This closes #5452.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4756573c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4756573c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4756573c

Branch: refs/heads/master
Commit: 4756573c257cfbc2390a4fc64e65f4de449a53a5
Parents: f83e2f7
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Feb 13 16:34:31 2018 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Thu Feb 22 17:32:37 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/dispatcher/Dispatcher.java       | 12 ++++++++++++
 .../flink/runtime/webmonitor/RestfulGateway.java   | 17 +++++++++++++++++
 2 files changed, 29 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4756573c/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
index e751bc4..b2d2b6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
@@ -40,6 +40,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
@@ -314,6 +315,17 @@ public abstract class Dispatcher extends 
FencedRpcEndpoint<DispatcherId> impleme
        }
 
        @Override
+       public CompletableFuture<Acknowledge> rescaleJob(JobID jobId, int 
newParallelism, RescalingBehaviour rescalingBehaviour, Time timeout) {
+               JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
+
+               if (jobManagerRunner == null) {
+                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+               } else {
+                       return 
jobManagerRunner.getJobManagerGateway().rescaleJob(newParallelism, 
rescalingBehaviour, timeout);
+               }
+       }
+
+       @Override
        public CompletableFuture<String> requestRestAddress(Time timeout) {
                if (restAddress != null) {
                        return CompletableFuture.completedFuture(restAddress);

http://git-wip-us.apache.org/repos/asf/flink/blob/4756573c/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
index 65a4664..ed90f37 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
@@ -27,6 +27,7 @@ import 
org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmaster.JobResult;
+import org.apache.flink.runtime.jobmaster.RescalingBehaviour;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
 import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
@@ -169,4 +170,20 @@ public interface RestfulGateway extends RpcGateway {
                throw new UnsupportedOperationException();
        }
 
+       /**
+        * Trigger rescaling of the given job.
+        *
+        * @param jobId specifying the job to rescale
+        * @param newParallelism new parallelism of the job
+        * @param rescalingBehaviour defining how strict the rescaling has to 
be executed
+        * @param timeout of this operation
+        * @return Future which is completed with {@link Acknowledge} once the 
rescaling was successful
+        */
+       default CompletableFuture<Acknowledge> rescaleJob(
+                       JobID jobId,
+                       int newParallelism,
+                       RescalingBehaviour rescalingBehaviour,
+                       @RpcTimeout Time timeout) {
+               throw new UnsupportedOperationException();
+       }
 }

Reply via email to