Repository: flink
Updated Branches:
  refs/heads/master fb3bd1fce -> 1fc4a6097


[FLINK-7489] Remove startJobExecution and suspendExecution from JobMasterGateway

The job lifecycle methods should not be exposed as RPCs. Therefore, this commit
removes them from the JobMasterGateway definition.

This closes #4573.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1fc4a609
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1fc4a609
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1fc4a609

Branch: refs/heads/master
Commit: 1fc4a609761445ecf77c374ae9daec8a1ada2618
Parents: fb3bd1f
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Aug 22 16:33:05 2017 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Sep 3 23:05:12 2017 +0200

----------------------------------------------------------------------
 .../runtime/jobmaster/JobManagerRunner.java     |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      | 199 ++++++++++---------
 .../runtime/jobmaster/JobMasterGateway.java     |  10 -
 .../jobmaster/JobManagerRunnerMockTest.java     |   4 +-
 4 files changed, 109 insertions(+), 106 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
index c312cd3..8f1be4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java
@@ -434,7 +434,7 @@ public class JobManagerRunner implements LeaderContender, 
OnCompletionActions, F
                        log.info("JobManager for job {} ({}) was revoked 
leadership at {}.",
                                jobGraph.getName(), jobGraph.getJobID(), 
getAddress());
 
-                       
jobManager.getSelfGateway(JobMasterGateway.class).suspendExecution(new 
Exception("JobManager is no longer the leader."));
+                       jobManager.suspend(new Exception("JobManager is no 
longer the leader."));
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a8a8632..c30749c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -318,7 +318,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                        super.start();
 
                        log.info("JobManager started as leader {} for job {}", 
leaderSessionID, jobGraph.getJobID());
-                       selfGateway.startJobExecution();
+                       runAsync(this::startJobExecution);
                }
                else {
                        log.warn("Job already started with leader ID {}, 
ignoring this start request.", leaderSessionID);
@@ -326,6 +326,21 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        }
 
        /**
+        * Suspending job, all the running tasks will be cancelled, and 
communication with other components
+        * will be disposed.
+        *
+        * <p>Mostly job is suspended because of the leadership has been 
revoked, one can be restart this job by
+        * calling the {@link #start(UUID)} method once we take the leadership 
back again.
+        *
+        * <p>This method is executed asynchronously
+        *
+        * @param cause The reason of why this job been suspended.
+        */
+       public void suspend(final Throwable cause) {
+               runAsync(() -> suspendExecution(cause));
+       }
+
+       /**
         * Suspend the job and shutdown all other services including rpc.
         */
        @Override
@@ -343,98 +358,6 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        // RPC methods
        
//----------------------------------------------------------------------------------------------
 
-       //-- job starting and stopping  
-----------------------------------------------------------------
-
-       @Override
-       public void startJobExecution() {
-               // double check that the leader status did not change
-               if (leaderSessionID == null) {
-                       log.info("Aborting job startup - JobManager lost leader 
status");
-                       return;
-               }
-
-               log.info("Starting execution of job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
-
-               try {
-                       // start the slot pool make sure the slot pool now 
accepts messages for this leader
-                       log.debug("Staring SlotPool component");
-                       slotPool.start(leaderSessionID, getAddress());
-               } catch (Exception e) {
-                       log.error("Faild to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), e);
-
-                       handleFatalError(new Exception("Could not start job 
execution: Failed to start the slot pool.", e));
-               }
-
-               try {
-                       // job is ready to go, try to establish connection with 
resource manager
-                       //   - activate leader retrieval for the resource 
manager
-                       //   - on notification of the leader, the connection 
will be established and
-                       //     the slot pool will start requesting slots
-                       resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
-               }
-               catch (Throwable t) {
-                       log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
-
-                       handleFatalError(new Exception(
-                                       "Could not start job execution: Failed 
to start leader service for Resource Manager", t));
-
-                       return;
-               }
-
-               // start scheduling job in another thread
-               executor.execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               try {
-                                       executionGraph.scheduleForExecution();
-                               }
-                               catch (Throwable t) {
-                                       executionGraph.failGlobal(t);
-                               }
-                       }
-               });
-       }
-
-       /**
-        * Suspending job, all the running tasks will be cancelled, and 
communication with other components
-        * will be disposed.
-        *
-        * <p>Mostly job is suspended because of the leadership has been 
revoked, one can be restart this job by
-        * calling the {@link #start(UUID)} method once we take the leadership 
back again.
-        *
-        * @param cause The reason of why this job been suspended.
-        */
-       @Override
-       public void suspendExecution(final Throwable cause) {
-               if (leaderSessionID == null) {
-                       log.debug("Job has already been suspended or 
shutdown.");
-                       return;
-               }
-
-               // not leader any more - should not accept any leader messages 
any more
-               leaderSessionID = null;
-
-               try {
-                       resourceManagerLeaderRetriever.stop();
-               } catch (Throwable t) {
-                       log.warn("Failed to stop resource manager leader 
retriever when suspending.", t);
-               }
-
-               // tell the execution graph (JobManager is still processing 
messages here) 
-               executionGraph.suspend(cause);
-
-               // receive no more messages until started again, should be 
called before we clear self leader id
-               stop();
-
-               // the slot pool stops receiving messages and clears its pooled 
slots 
-               slotPoolGateway.suspend();
-
-               // disconnect from resource manager:
-               closeResourceManagerConnection(new Exception("Execution was 
suspended.", cause));
-       }
-
-       
//----------------------------------------------------------------------------------------------
-
        /**
         * Updates the task execution state for a given task.
         *
@@ -863,6 +786,96 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        // Internal methods
        
//----------------------------------------------------------------------------------------------
 
+       //-- job starting and stopping  
-----------------------------------------------------------------
+
+       private void startJobExecution() {
+               // double check that the leader status did not change
+               if (leaderSessionID == null) {
+                       log.info("Aborting job startup - JobManager lost leader 
status");
+                       return;
+               }
+
+               log.info("Starting execution of job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
+
+               try {
+                       // start the slot pool make sure the slot pool now 
accepts messages for this leader
+                       log.debug("Staring SlotPool component");
+                       slotPool.start(leaderSessionID, getAddress());
+               } catch (Exception e) {
+                       log.error("Faild to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), e);
+
+                       handleFatalError(new Exception("Could not start job 
execution: Failed to start the slot pool.", e));
+               }
+
+               try {
+                       // job is ready to go, try to establish connection with 
resource manager
+                       //   - activate leader retrieval for the resource 
manager
+                       //   - on notification of the leader, the connection 
will be established and
+                       //     the slot pool will start requesting slots
+                       resourceManagerLeaderRetriever.start(new 
ResourceManagerLeaderListener());
+               }
+               catch (Throwable t) {
+                       log.error("Failed to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), t);
+
+                       handleFatalError(new Exception(
+                               "Could not start job execution: Failed to start 
leader service for Resource Manager", t));
+
+                       return;
+               }
+
+               // start scheduling job in another thread
+               executor.execute(new Runnable() {
+                       @Override
+                       public void run() {
+                               try {
+                                       executionGraph.scheduleForExecution();
+                               }
+                               catch (Throwable t) {
+                                       executionGraph.failGlobal(t);
+                               }
+                       }
+               });
+       }
+
+       /**
+        * Suspending job, all the running tasks will be cancelled, and 
communication with other components
+        * will be disposed.
+        *
+        * <p>Mostly job is suspended because of the leadership has been 
revoked, one can be restart this job by
+        * calling the {@link #start(UUID)} method once we take the leadership 
back again.
+        *
+        * @param cause The reason of why this job been suspended.
+        */
+       private void suspendExecution(final Throwable cause) {
+               if (leaderSessionID == null) {
+                       log.debug("Job has already been suspended or 
shutdown.");
+                       return;
+               }
+
+               // not leader any more - should not accept any leader messages 
any more
+               leaderSessionID = null;
+
+               try {
+                       resourceManagerLeaderRetriever.stop();
+               } catch (Throwable t) {
+                       log.warn("Failed to stop resource manager leader 
retriever when suspending.", t);
+               }
+
+               // tell the execution graph (JobManager is still processing 
messages here)
+               executionGraph.suspend(cause);
+
+               // receive no more messages until started again, should be 
called before we clear self leader id
+               stop();
+
+               // the slot pool stops receiving messages and clears its pooled 
slots
+               slotPoolGateway.suspend();
+
+               // disconnect from resource manager:
+               closeResourceManagerConnection(new Exception("Execution was 
suspended.", cause));
+       }
+
+       
//----------------------------------------------------------------------------------------------
+
        private void handleFatalError(final Throwable cause) {
                runAsync(new Runnable() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b396cd6..bfa2930 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -52,16 +52,6 @@ import java.util.concurrent.CompletableFuture;
  */
 public interface JobMasterGateway extends CheckpointCoordinatorGateway {
 
-       // 
------------------------------------------------------------------------
-       //  Job start and stop methods
-       // 
------------------------------------------------------------------------
-
-       void startJobExecution();
-
-       void suspendExecution(Throwable cause);
-
-       // 
------------------------------------------------------------------------
-
        /**
         * Updates the task execution state for a given task.
         *

http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 435c23d..998e803 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -209,7 +209,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
                assertTrue(!jobCompletion.isJobFinished());
 
                runner.revokeLeadership();
-               verify(jobManager).suspendExecution(any(Throwable.class));
+               verify(jobManager).suspend(any(Throwable.class));
                assertFalse(runner.isShutdown());
        }
 
@@ -224,7 +224,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
                assertTrue(!jobCompletion.isJobFinished());
 
                runner.revokeLeadership();
-               verify(jobManager).suspendExecution(any(Throwable.class));
+               verify(jobManager).suspend(any(Throwable.class));
                assertFalse(runner.isShutdown());
 
                UUID leaderSessionID2 = UUID.randomUUID();

Reply via email to