[FLINK-4400] [cluster mngmt] Implement leadership election among JobMasters
Adapt related components to the changes in HighAvailabilityServices Add comments for getJobMasterElectionService in HighAvailabilityServices This closes #2377. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81a35c18 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81a35c18 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81a35c18 Branch: refs/heads/flip-6 Commit: 81a35c182fff7dc666f8d88c8c58ba9a94038e05 Parents: 223eb6f Author: xiaogang.sxg <[email protected]> Authored: Wed Aug 17 13:46:00 2016 +0800 Committer: Till Rohrmann <[email protected]> Committed: Thu Sep 8 17:26:59 2016 +0200 ---------------------------------------------------------------------- .../HighAvailabilityServices.java | 9 + .../runtime/highavailability/NonHaServices.java | 8 + .../flink/runtime/rpc/jobmaster/JobMaster.java | 318 +++++++++---------- .../runtime/rpc/akka/AkkaRpcServiceTest.java | 53 +--- 4 files changed, 179 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/81a35c18/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java index 094d36f..73e4f1f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; /** @@ -36,4 +38,11 @@ public interface HighAvailabilityServices { * Gets the leader retriever for the cluster's resource manager. */ LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception; + + /** + * Gets the leader election service for the given job. + * + * @param jobID The identifier of the job running the election. + */ + LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/81a35c18/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java index b8c2ed8..3d2769b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java @@ -18,6 +18,9 @@ package org.apache.flink.runtime.highavailability; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService; @@ -56,4 +59,9 @@ public class NonHaServices implements HighAvailabilityServices { public LeaderRetrievalService getResourceManagerLeaderRetriever() throws Exception { return new StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0)); } + + @Override + public LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) throws Exception { + return new StandaloneLeaderElectionService(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/81a35c18/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java index e53cd68..49b200b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java @@ -18,68 +18,77 @@ package org.apache.flink.runtime.rpc.jobmaster; -import akka.dispatch.Futures; -import akka.dispatch.Mapper; -import akka.dispatch.OnComplete; -import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.rpc.RpcMethod; -import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration; -import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.util.Preconditions; -import scala.Tuple2; -import scala.concurrent.ExecutionContext; -import scala.concurrent.ExecutionContext$; -import scala.concurrent.Future; -import scala.concurrent.duration.Deadline; -import scala.concurrent.duration.FiniteDuration; import java.util.UUID; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * JobMaster implementation. The job master is responsible for the execution of a single * {@link org.apache.flink.runtime.jobgraph.JobGraph}. - * + * <p> * It offers the following methods as part of its rpc interface to interact with the JobMaster * remotely: * <ul> - * <li>{@link #registerAtResourceManager(String)} triggers the registration at the resource manager</li> * <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the task execution state for * given task</li> * </ul> */ public class JobMaster extends RpcEndpoint<JobMasterGateway> { - /** Execution context for future callbacks */ - private final ExecutionContext executionContext; - - /** Execution context for scheduled runnables */ - private final ScheduledExecutorService scheduledExecutorService; - - private final FiniteDuration initialRegistrationTimeout = new FiniteDuration(500, TimeUnit.MILLISECONDS); - private final FiniteDuration maxRegistrationTimeout = new FiniteDuration(30, TimeUnit.SECONDS); - private final FiniteDuration registrationDuration = new FiniteDuration(365, TimeUnit.DAYS); - private final long failedRegistrationDelay = 10000; /** Gateway to connected resource manager, null iff not connected */ private ResourceManagerGateway resourceManager = null; - /** UUID to filter out old registration runs */ - private UUID currentRegistrationRun; + /** Logical representation of the job */ + private final JobGraph jobGraph; + private final JobID jobID; + + /** Configuration of the job */ + private final Configuration configuration; + private final RecoveryMode recoveryMode; + + /** Service to contend for and retrieve the leadership of JM and RM */ + private final HighAvailabilityServices highAvailabilityServices; + + /** Leader Management */ + private LeaderElectionService leaderElectionService = null; + private UUID leaderSessionID; + + /** + * The JM's Constructor + * + * @param jobGraph The representation of the job's execution plan + * @param configuration The job's configuration + * @param rpcService The RPC service at which the JM serves + * @param highAvailabilityService The cluster's HA service from the JM can elect and retrieve leaders. + */ + public JobMaster( + JobGraph jobGraph, + Configuration configuration, + RpcService rpcService, + HighAvailabilityServices highAvailabilityService) { - public JobMaster(RpcService rpcService, ExecutorService executorService) { super(rpcService); - executionContext = ExecutionContext$.MODULE$.fromExecutor( - Preconditions.checkNotNull(executorService)); - scheduledExecutorService = new ScheduledThreadPoolExecutor(1); + + this.jobGraph = Preconditions.checkNotNull(jobGraph); + this.jobID = Preconditions.checkNotNull(jobGraph.getJobID()); + + this.configuration = Preconditions.checkNotNull(configuration); + this.recoveryMode = RecoveryMode.fromConfig(configuration); + + this.highAvailabilityServices = Preconditions.checkNotNull(highAvailabilityService); } public ResourceManagerGateway getResourceManager() { @@ -87,6 +96,91 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { } //---------------------------------------------------------------------------------------------- + // Initialization methods + //---------------------------------------------------------------------------------------------- + public void start() { + super.start(); + + // register at the election once the JM starts + registerAtElectionService(); + } + + + //---------------------------------------------------------------------------------------------- + // JobMaster Leadership methods + //---------------------------------------------------------------------------------------------- + + /** + * Retrieves the election service and contend for the leadership. + */ + private void registerAtElectionService() { + try { + leaderElectionService = highAvailabilityServices.getJobMasterLeaderElectionService(jobID); + leaderElectionService.start(new JobMasterLeaderContender()); + } catch (Exception e) { + throw new RuntimeException("Fail to register at the election of JobMaster", e); + } + } + + /** + * Start the execution when the leadership is granted. + * + * @param newLeaderSessionID The identifier of the new leadership session + */ + public void grantJobMasterLeadership(final UUID newLeaderSessionID) { + runAsync(new Runnable() { + @Override + public void run() { + log.info("JobManager {} grants leadership with session id {}.", getAddress(), newLeaderSessionID); + + // The operation may be blocking, but since JM is idle before it grants the leadership, it's okay that + // JM waits here for the operation's completeness. + leaderSessionID = newLeaderSessionID; + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); + + // TODO:: execute the job when the leadership is granted. + } + }); + } + + /** + * Stop the execution when the leadership is revoked. + */ + public void revokeJobMasterLeadership() { + runAsync(new Runnable() { + @Override + public void run() { + log.info("JobManager {} was revoked leadership.", getAddress()); + + // TODO:: cancel the job's execution and notify all listeners + cancelAndClearEverything(new Exception("JobManager is no longer the leader.")); + + leaderSessionID = null; + } + }); + } + + /** + * Handles error occurring in the leader election service + * + * @param exception Exception thrown in the leader election service + */ + public void onJobMasterElectionError(final Exception exception) { + runAsync(new Runnable() { + @Override + public void run() { + log.error("Received an error from the LeaderElectionService.", exception); + + // TODO:: cancel the job's execution and shutdown the JM + cancelAndClearEverything(exception); + + leaderSessionID = null; + } + }); + + } + + //---------------------------------------------------------------------------------------------- // RPC methods //---------------------------------------------------------------------------------------------- @@ -109,18 +203,7 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { */ @RpcMethod public void registerAtResourceManager(final String address) { - currentRegistrationRun = UUID.randomUUID(); - - Future<ResourceManagerGateway> resourceManagerFuture = getRpcService().connect(address, ResourceManagerGateway.class); - - handleResourceManagerRegistration( - new JobMasterRegistration(getAddress()), - 1, - resourceManagerFuture, - currentRegistrationRun, - initialRegistrationTimeout, - maxRegistrationTimeout, - registrationDuration.fromNow()); + //TODO:: register at the RM } //---------------------------------------------------------------------------------------------- @@ -128,124 +211,37 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> { //---------------------------------------------------------------------------------------------- /** - * Helper method to handle the resource manager registration process. If a registration attempt - * times out, then a new attempt with the doubled time out is initiated. The whole registration - * process has a deadline. Once this deadline is overdue without successful registration, the - * job master shuts down. + * Cancel the current job and notify all listeners the job's cancellation. * - * @param jobMasterRegistration Job master registration info which is sent to the resource - * manager - * @param attemptNumber Registration attempt number - * @param resourceManagerFuture Future of the resource manager gateway - * @param registrationRun UUID describing the current registration run - * @param timeout Timeout of the last registration attempt - * @param maxTimeout Maximum timeout between registration attempts - * @param deadline Deadline for the registration + * @param cause Cause for the cancelling. */ - void handleResourceManagerRegistration( - final JobMasterRegistration jobMasterRegistration, - final int attemptNumber, - final Future<ResourceManagerGateway> resourceManagerFuture, - final UUID registrationRun, - final FiniteDuration timeout, - final FiniteDuration maxTimeout, - final Deadline deadline) { - - // filter out concurrent registration runs - if (registrationRun.equals(currentRegistrationRun)) { - - log.info("Start registration attempt #{}.", attemptNumber); - - if (deadline.isOverdue()) { - // we've exceeded our registration deadline. This means that we have to shutdown the JobMaster - log.error("Exceeded registration deadline without successfully registering at the ResourceManager."); - shutDown(); - } else { - Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> registrationResponseFuture = resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() { - @Override - public Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> apply(ResourceManagerGateway resourceManagerGateway) { - return resourceManagerGateway.registerJobMaster(jobMasterRegistration, timeout).zip(Futures.successful(resourceManagerGateway)); - } - }, executionContext); - - registrationResponseFuture.onComplete(new OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() { - @Override - public void onComplete(Throwable failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws Throwable { - if (failure != null) { - if (failure instanceof TimeoutException) { - // we haven't received an answer in the given timeout interval, - // so increase it and try again. - final FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout); - - handleResourceManagerRegistration( - jobMasterRegistration, - attemptNumber + 1, - resourceManagerFuture, - registrationRun, - newTimeout, - maxTimeout, - deadline); - } else { - log.error("Received unknown error while registering at the ResourceManager.", failure); - shutDown(); - } - } else { - final RegistrationResponse response = tuple._1(); - final ResourceManagerGateway gateway = tuple._2(); - - if (response.isSuccess()) { - finishResourceManagerRegistration(gateway, response.getInstanceID()); - } else { - log.info("The registration was refused. Try again."); - - scheduledExecutorService.schedule(new Runnable() { - @Override - public void run() { - // we have to execute scheduled runnable in the main thread - // because we need consistency wrt currentRegistrationRun - runAsync(new Runnable() { - @Override - public void run() { - // our registration attempt was refused. Start over. - handleResourceManagerRegistration( - jobMasterRegistration, - 1, - resourceManagerFuture, - registrationRun, - initialRegistrationTimeout, - maxTimeout, - deadline); - } - }); - } - }, failedRegistrationDelay, TimeUnit.MILLISECONDS); - } - } - } - }, getMainThreadExecutionContext()); // use the main thread execution context to execute the call back in the main thread - } - } else { - log.info("Discard out-dated registration run."); - } + private void cancelAndClearEverything(Throwable cause) { + // currently, nothing to do here } - /** - * Finish the resource manager registration by setting the new resource manager gateway. - * - * @param resourceManager New resource manager gateway - * @param instanceID Instance id assigned by the resource manager - */ - void finishResourceManagerRegistration(ResourceManagerGateway resourceManager, InstanceID instanceID) { - log.info("Successfully registered at the ResourceManager under instance id {}.", instanceID); - this.resourceManager = resourceManager; - } + // ------------------------------------------------------------------------ + // Utility classes + // ------------------------------------------------------------------------ + private class JobMasterLeaderContender implements LeaderContender { - /** - * Return if the job master is connected to a resource manager. - * - * @return true if the job master is connected to the resource manager - */ - public boolean isConnected() { - return resourceManager != null; + @Override + public void grantLeadership(UUID leaderSessionID) { + JobMaster.this.grantJobMasterLeadership(leaderSessionID); + } + + @Override + public void revokeLeadership() { + JobMaster.this.revokeJobMasterLeadership(); + } + + @Override + public String getAddress() { + return JobMaster.this.getAddress(); + } + + @Override + public void handleError(Exception exception) { + onJobMasterElectionError(exception); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/81a35c18/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 7b4ab89..2790cf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -20,9 +20,12 @@ package org.apache.flink.runtime.rpc.akka; import akka.actor.ActorSystem; import akka.util.Timeout; - import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.rpc.jobmaster.JobMaster; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager; @@ -31,6 +34,7 @@ import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; +import org.mockito.Mockito; import scala.concurrent.duration.Deadline; import scala.concurrent.duration.FiniteDuration; @@ -80,51 +84,4 @@ public class AkkaRpcServiceTest extends TestLogger { assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); } - - // ------------------------------------------------------------------------ - // specific component tests - should be moved to the test classes - // for those components - // ------------------------------------------------------------------------ - - /** - * Tests that the {@link JobMaster} can connect to the {@link ResourceManager} using the - * {@link AkkaRpcService}. - */ - @Test - public void testJobMasterResourceManagerRegistration() throws Exception { - Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS); - ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem(); - AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, akkaTimeout); - AkkaRpcService akkaRpcService2 = new AkkaRpcService(actorSystem2, akkaTimeout); - ExecutorService executorService = new ForkJoinPool(); - - ResourceManager resourceManager = new ResourceManager(akkaRpcService, executorService); - JobMaster jobMaster = new JobMaster(akkaRpcService2, executorService); - - resourceManager.start(); - jobMaster.start(); - - ResourceManagerGateway rm = resourceManager.getSelf(); - - assertTrue(rm instanceof AkkaGateway); - - AkkaGateway akkaClient = (AkkaGateway) rm; - - - jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, akkaClient.getRpcEndpoint())); - - // wait for successful registration - FiniteDuration timeout = new FiniteDuration(200, TimeUnit.SECONDS); - Deadline deadline = timeout.fromNow(); - - while (deadline.hasTimeLeft() && !jobMaster.isConnected()) { - Thread.sleep(100); - } - - assertFalse(deadline.isOverdue()); - - jobMaster.shutDown(); - resourceManager.shutDown(); - } }
