Repository: flink Updated Branches: refs/heads/master fb3bd1fce -> 1fc4a6097
[FLINK-7489] Remove startJobExecution and suspendExecution from JobMasterGateway The job lifecycle methods should not be exposed as RPCs. Therefore, this commit removes them from the JobMasterGateway definition. This closes #4573. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1fc4a609 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1fc4a609 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1fc4a609 Branch: refs/heads/master Commit: 1fc4a609761445ecf77c374ae9daec8a1ada2618 Parents: fb3bd1f Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Aug 22 16:33:05 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sun Sep 3 23:05:12 2017 +0200 ---------------------------------------------------------------------- .../runtime/jobmaster/JobManagerRunner.java | 2 +- .../flink/runtime/jobmaster/JobMaster.java | 199 ++++++++++--------- .../runtime/jobmaster/JobMasterGateway.java | 10 - .../jobmaster/JobManagerRunnerMockTest.java | 4 +- 4 files changed, 109 insertions(+), 106 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java index c312cd3..8f1be4c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerRunner.java @@ -434,7 +434,7 @@ public class JobManagerRunner implements LeaderContender, OnCompletionActions, F log.info("JobManager for job {} ({}) was revoked leadership at {}.", jobGraph.getName(), jobGraph.getJobID(), getAddress()); - jobManager.getSelfGateway(JobMasterGateway.class).suspendExecution(new Exception("JobManager is no longer the leader.")); + jobManager.suspend(new Exception("JobManager is no longer the leader.")); } } http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index a8a8632..c30749c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -318,7 +318,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { super.start(); log.info("JobManager started as leader {} for job {}", leaderSessionID, jobGraph.getJobID()); - selfGateway.startJobExecution(); + runAsync(this::startJobExecution); } else { log.warn("Job already started with leader ID {}, ignoring this start request.", leaderSessionID); @@ -326,6 +326,21 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { } /** + * Suspending job, all the running tasks will be cancelled, and communication with other components + * will be disposed. + * + * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by + * calling the {@link #start(UUID)} method once we take the leadership back again. + * + * <p>This method is executed asynchronously + * + * @param cause The reason of why this job been suspended. + */ + public void suspend(final Throwable cause) { + runAsync(() -> suspendExecution(cause)); + } + + /** * Suspend the job and shutdown all other services including rpc. */ @Override @@ -343,98 +358,6 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { // RPC methods //---------------------------------------------------------------------------------------------- - //-- job starting and stopping ----------------------------------------------------------------- - - @Override - public void startJobExecution() { - // double check that the leader status did not change - if (leaderSessionID == null) { - log.info("Aborting job startup - JobManager lost leader status"); - return; - } - - log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); - - try { - // start the slot pool make sure the slot pool now accepts messages for this leader - log.debug("Staring SlotPool component"); - slotPool.start(leaderSessionID, getAddress()); - } catch (Exception e) { - log.error("Faild to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), e); - - handleFatalError(new Exception("Could not start job execution: Failed to start the slot pool.", e)); - } - - try { - // job is ready to go, try to establish connection with resource manager - // - activate leader retrieval for the resource manager - // - on notification of the leader, the connection will be established and - // the slot pool will start requesting slots - resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); - } - catch (Throwable t) { - log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); - - handleFatalError(new Exception( - "Could not start job execution: Failed to start leader service for Resource Manager", t)); - - return; - } - - // start scheduling job in another thread - executor.execute(new Runnable() { - @Override - public void run() { - try { - executionGraph.scheduleForExecution(); - } - catch (Throwable t) { - executionGraph.failGlobal(t); - } - } - }); - } - - /** - * Suspending job, all the running tasks will be cancelled, and communication with other components - * will be disposed. - * - * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by - * calling the {@link #start(UUID)} method once we take the leadership back again. - * - * @param cause The reason of why this job been suspended. - */ - @Override - public void suspendExecution(final Throwable cause) { - if (leaderSessionID == null) { - log.debug("Job has already been suspended or shutdown."); - return; - } - - // not leader any more - should not accept any leader messages any more - leaderSessionID = null; - - try { - resourceManagerLeaderRetriever.stop(); - } catch (Throwable t) { - log.warn("Failed to stop resource manager leader retriever when suspending.", t); - } - - // tell the execution graph (JobManager is still processing messages here) - executionGraph.suspend(cause); - - // receive no more messages until started again, should be called before we clear self leader id - stop(); - - // the slot pool stops receiving messages and clears its pooled slots - slotPoolGateway.suspend(); - - // disconnect from resource manager: - closeResourceManagerConnection(new Exception("Execution was suspended.", cause)); - } - - //---------------------------------------------------------------------------------------------- - /** * Updates the task execution state for a given task. * @@ -863,6 +786,96 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { // Internal methods //---------------------------------------------------------------------------------------------- + //-- job starting and stopping ----------------------------------------------------------------- + + private void startJobExecution() { + // double check that the leader status did not change + if (leaderSessionID == null) { + log.info("Aborting job startup - JobManager lost leader status"); + return; + } + + log.info("Starting execution of job {} ({})", jobGraph.getName(), jobGraph.getJobID()); + + try { + // start the slot pool make sure the slot pool now accepts messages for this leader + log.debug("Staring SlotPool component"); + slotPool.start(leaderSessionID, getAddress()); + } catch (Exception e) { + log.error("Faild to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), e); + + handleFatalError(new Exception("Could not start job execution: Failed to start the slot pool.", e)); + } + + try { + // job is ready to go, try to establish connection with resource manager + // - activate leader retrieval for the resource manager + // - on notification of the leader, the connection will be established and + // the slot pool will start requesting slots + resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener()); + } + catch (Throwable t) { + log.error("Failed to start job {} ({})", jobGraph.getName(), jobGraph.getJobID(), t); + + handleFatalError(new Exception( + "Could not start job execution: Failed to start leader service for Resource Manager", t)); + + return; + } + + // start scheduling job in another thread + executor.execute(new Runnable() { + @Override + public void run() { + try { + executionGraph.scheduleForExecution(); + } + catch (Throwable t) { + executionGraph.failGlobal(t); + } + } + }); + } + + /** + * Suspending job, all the running tasks will be cancelled, and communication with other components + * will be disposed. + * + * <p>Mostly job is suspended because of the leadership has been revoked, one can be restart this job by + * calling the {@link #start(UUID)} method once we take the leadership back again. + * + * @param cause The reason of why this job been suspended. + */ + private void suspendExecution(final Throwable cause) { + if (leaderSessionID == null) { + log.debug("Job has already been suspended or shutdown."); + return; + } + + // not leader any more - should not accept any leader messages any more + leaderSessionID = null; + + try { + resourceManagerLeaderRetriever.stop(); + } catch (Throwable t) { + log.warn("Failed to stop resource manager leader retriever when suspending.", t); + } + + // tell the execution graph (JobManager is still processing messages here) + executionGraph.suspend(cause); + + // receive no more messages until started again, should be called before we clear self leader id + stop(); + + // the slot pool stops receiving messages and clears its pooled slots + slotPoolGateway.suspend(); + + // disconnect from resource manager: + closeResourceManagerConnection(new Exception("Execution was suspended.", cause)); + } + + //---------------------------------------------------------------------------------------------- + private void handleFatalError(final Throwable cause) { runAsync(new Runnable() { @Override http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index b396cd6..bfa2930 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -52,16 +52,6 @@ import java.util.concurrent.CompletableFuture; */ public interface JobMasterGateway extends CheckpointCoordinatorGateway { - // ------------------------------------------------------------------------ - // Job start and stop methods - // ------------------------------------------------------------------------ - - void startJobExecution(); - - void suspendExecution(Throwable cause); - - // ------------------------------------------------------------------------ - /** * Updates the task execution state for a given task. * http://git-wip-us.apache.org/repos/asf/flink/blob/1fc4a609/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index 435c23d..998e803 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -209,7 +209,7 @@ public class JobManagerRunnerMockTest extends TestLogger { assertTrue(!jobCompletion.isJobFinished()); runner.revokeLeadership(); - verify(jobManager).suspendExecution(any(Throwable.class)); + verify(jobManager).suspend(any(Throwable.class)); assertFalse(runner.isShutdown()); } @@ -224,7 +224,7 @@ public class JobManagerRunnerMockTest extends TestLogger { assertTrue(!jobCompletion.isJobFinished()); runner.revokeLeadership(); - verify(jobManager).suspendExecution(any(Throwable.class)); + verify(jobManager).suspend(any(Throwable.class)); assertFalse(runner.isShutdown()); UUID leaderSessionID2 = UUID.randomUUID();