[FLINK-4400] [cluster mngmt] Implement leadership election among JobMasters

Adapt related components to the changes in HighAvailabilityServices

Add comments for getJobMasterElectionService in HighAvailabilityServices

This closes #2377.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8fd8c998
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8fd8c998
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8fd8c998

Branch: refs/heads/flip-6
Commit: 8fd8c998b39c313c532b5816213b79770173450b
Parents: 5ea97a1
Author: xiaogang.sxg <xiaogang....@alibaba-inc.com>
Authored: Wed Aug 17 13:46:00 2016 +0800
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Wed Sep 21 11:39:15 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   9 +
 .../runtime/highavailability/NonHaServices.java |   8 +
 .../flink/runtime/rpc/jobmaster/JobMaster.java  | 318 +++++++++----------
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |  53 +---
 4 files changed, 179 insertions(+), 209 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8fd8c998/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 094d36f..73e4f1f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
 /**
@@ -36,4 +38,11 @@ public interface HighAvailabilityServices {
         * Gets the leader retriever for the cluster's resource manager.
         */
        LeaderRetrievalService getResourceManagerLeaderRetriever() throws 
Exception;
+
+       /**
+        * Gets the leader election service for the given job.
+        *
+        * @param jobID The identifier of the job running the election.
+        */
+       LeaderElectionService getJobMasterLeaderElectionService(JobID jobID) 
throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8fd8c998/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index b8c2ed8..3d2769b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.runtime.highavailability;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 
@@ -56,4 +59,9 @@ public class NonHaServices implements 
HighAvailabilityServices {
        public LeaderRetrievalService getResourceManagerLeaderRetriever() 
throws Exception {
                return new 
StandaloneLeaderRetrievalService(resourceManagerAddress, new UUID(0, 0));
        }
+
+       @Override
+       public LeaderElectionService getJobMasterLeaderElectionService(JobID 
jobID) throws Exception {
+               return new StandaloneLeaderElectionService();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8fd8c998/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
index e53cd68..49b200b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/jobmaster/JobMaster.java
@@ -18,68 +18,77 @@
 
 package org.apache.flink.runtime.rpc.jobmaster;
 
-import akka.dispatch.Futures;
-import akka.dispatch.Mapper;
-import akka.dispatch.OnComplete;
-import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobmanager.RecoveryMode;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcMethod;
-import org.apache.flink.runtime.rpc.resourcemanager.JobMasterRegistration;
-import org.apache.flink.runtime.rpc.resourcemanager.RegistrationResponse;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.util.Preconditions;
-import scala.Tuple2;
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 /**
  * JobMaster implementation. The job master is responsible for the execution 
of a single
  * {@link org.apache.flink.runtime.jobgraph.JobGraph}.
- *
+ * <p>
  * It offers the following methods as part of its rpc interface to interact 
with the JobMaster
  * remotely:
  * <ul>
- *     <li>{@link #registerAtResourceManager(String)} triggers the 
registration at the resource manager</li>
  *     <li>{@link #updateTaskExecutionState(TaskExecutionState)} updates the 
task execution state for
  * given task</li>
  * </ul>
  */
 public class JobMaster extends RpcEndpoint<JobMasterGateway> {
-       /** Execution context for future callbacks */
-       private final ExecutionContext executionContext;
-
-       /** Execution context for scheduled runnables */
-       private final ScheduledExecutorService scheduledExecutorService;
-
-       private final FiniteDuration initialRegistrationTimeout = new 
FiniteDuration(500, TimeUnit.MILLISECONDS);
-       private final FiniteDuration maxRegistrationTimeout = new 
FiniteDuration(30, TimeUnit.SECONDS);
-       private final FiniteDuration registrationDuration = new 
FiniteDuration(365, TimeUnit.DAYS);
-       private final long failedRegistrationDelay = 10000;
 
        /** Gateway to connected resource manager, null iff not connected */
        private ResourceManagerGateway resourceManager = null;
 
-       /** UUID to filter out old registration runs */
-       private UUID currentRegistrationRun;
+       /** Logical representation of the job */
+       private final JobGraph jobGraph;
+       private final JobID jobID;
+
+       /** Configuration of the job */
+       private final Configuration configuration;
+       private final RecoveryMode recoveryMode;
+
+       /** Service to contend for and retrieve the leadership of JM and RM */
+       private final HighAvailabilityServices highAvailabilityServices;
+
+       /** Leader Management */
+       private LeaderElectionService leaderElectionService = null;
+       private UUID leaderSessionID;
+
+       /**
+        * The JM's Constructor
+        *
+        * @param jobGraph The representation of the job's execution plan
+        * @param configuration The job's configuration
+        * @param rpcService The RPC service at which the JM serves
+        * @param highAvailabilityService The cluster's HA service from the JM 
can elect and retrieve leaders.
+        */
+       public JobMaster(
+               JobGraph jobGraph,
+               Configuration configuration,
+               RpcService rpcService,
+               HighAvailabilityServices highAvailabilityService) {
 
-       public JobMaster(RpcService rpcService, ExecutorService 
executorService) {
                super(rpcService);
-               executionContext = ExecutionContext$.MODULE$.fromExecutor(
-                       Preconditions.checkNotNull(executorService));
-               scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
+
+               this.jobGraph = Preconditions.checkNotNull(jobGraph);
+               this.jobID = Preconditions.checkNotNull(jobGraph.getJobID());
+
+               this.configuration = Preconditions.checkNotNull(configuration);
+               this.recoveryMode = RecoveryMode.fromConfig(configuration);
+
+               this.highAvailabilityServices = 
Preconditions.checkNotNull(highAvailabilityService);
        }
 
        public ResourceManagerGateway getResourceManager() {
@@ -87,6 +96,91 @@ public class JobMaster extends RpcEndpoint<JobMasterGateway> 
{
        }
 
        
//----------------------------------------------------------------------------------------------
+       // Initialization methods
+       
//----------------------------------------------------------------------------------------------
+       public void start() {
+               super.start();
+
+               // register at the election once the JM starts
+               registerAtElectionService();
+       }
+
+
+       
//----------------------------------------------------------------------------------------------
+       // JobMaster Leadership methods
+       
//----------------------------------------------------------------------------------------------
+
+       /**
+        * Retrieves the election service and contend for the leadership.
+        */
+       private void registerAtElectionService() {
+               try {
+                       leaderElectionService = 
highAvailabilityServices.getJobMasterLeaderElectionService(jobID);
+                       leaderElectionService.start(new 
JobMasterLeaderContender());
+               } catch (Exception e) {
+                       throw new RuntimeException("Fail to register at the 
election of JobMaster", e);
+               }
+       }
+
+       /**
+        * Start the execution when the leadership is granted.
+        *
+        * @param newLeaderSessionID The identifier of the new leadership 
session
+        */
+       public void grantJobMasterLeadership(final UUID newLeaderSessionID) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("JobManager {} grants leadership with 
session id {}.", getAddress(), newLeaderSessionID);
+
+                               // The operation may be blocking, but since JM 
is idle before it grants the leadership, it's okay that
+                               // JM waits here for the operation's 
completeness.
+                               leaderSessionID = newLeaderSessionID;
+                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
+
+                               // TODO:: execute the job when the leadership 
is granted.
+                       }
+               });
+       }
+
+       /**
+        * Stop the execution when the leadership is revoked.
+        */
+       public void revokeJobMasterLeadership() {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.info("JobManager {} was revoked 
leadership.", getAddress());
+
+                               // TODO:: cancel the job's execution and notify 
all listeners
+                               cancelAndClearEverything(new 
Exception("JobManager is no longer the leader."));
+
+                               leaderSessionID = null;
+                       }
+               });
+       }
+
+       /**
+        * Handles error occurring in the leader election service
+        *
+        * @param exception Exception thrown in the leader election service
+        */
+       public void onJobMasterElectionError(final Exception exception) {
+               runAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               log.error("Received an error from the 
LeaderElectionService.", exception);
+
+                               // TODO:: cancel the job's execution and 
shutdown the JM
+                               cancelAndClearEverything(exception);
+
+                               leaderSessionID = null;
+                       }
+               });
+
+       }
+
+       
//----------------------------------------------------------------------------------------------
        // RPC methods
        
//----------------------------------------------------------------------------------------------
 
@@ -109,18 +203,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         */
        @RpcMethod
        public void registerAtResourceManager(final String address) {
-               currentRegistrationRun = UUID.randomUUID();
-
-               Future<ResourceManagerGateway> resourceManagerFuture = 
getRpcService().connect(address, ResourceManagerGateway.class);
-
-               handleResourceManagerRegistration(
-                       new JobMasterRegistration(getAddress()),
-                       1,
-                       resourceManagerFuture,
-                       currentRegistrationRun,
-                       initialRegistrationTimeout,
-                       maxRegistrationTimeout,
-                       registrationDuration.fromNow());
+               //TODO:: register at the RM
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -128,124 +211,37 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        
//----------------------------------------------------------------------------------------------
 
        /**
-        * Helper method to handle the resource manager registration process. 
If a registration attempt
-        * times out, then a new attempt with the doubled time out is 
initiated. The whole registration
-        * process has a deadline. Once this deadline is overdue without 
successful registration, the
-        * job master shuts down.
+        * Cancel the current job and notify all listeners the job's 
cancellation.
         *
-        * @param jobMasterRegistration Job master registration info which is 
sent to the resource
-        *                              manager
-        * @param attemptNumber Registration attempt number
-        * @param resourceManagerFuture Future of the resource manager gateway
-        * @param registrationRun UUID describing the current registration run
-        * @param timeout Timeout of the last registration attempt
-        * @param maxTimeout Maximum timeout between registration attempts
-        * @param deadline Deadline for the registration
+        * @param cause Cause for the cancelling.
         */
-       void handleResourceManagerRegistration(
-               final JobMasterRegistration jobMasterRegistration,
-               final int attemptNumber,
-               final Future<ResourceManagerGateway> resourceManagerFuture,
-               final UUID registrationRun,
-               final FiniteDuration timeout,
-               final FiniteDuration maxTimeout,
-               final Deadline deadline) {
-
-               // filter out concurrent registration runs
-               if (registrationRun.equals(currentRegistrationRun)) {
-
-                       log.info("Start registration attempt #{}.", 
attemptNumber);
-
-                       if (deadline.isOverdue()) {
-                               // we've exceeded our registration deadline. 
This means that we have to shutdown the JobMaster
-                               log.error("Exceeded registration deadline 
without successfully registering at the ResourceManager.");
-                               shutDown();
-                       } else {
-                               Future<Tuple2<RegistrationResponse, 
ResourceManagerGateway>> registrationResponseFuture = 
resourceManagerFuture.flatMap(new Mapper<ResourceManagerGateway, 
Future<Tuple2<RegistrationResponse, ResourceManagerGateway>>>() {
-                                       @Override
-                                       public 
Future<Tuple2<RegistrationResponse, ResourceManagerGateway>> 
apply(ResourceManagerGateway resourceManagerGateway) {
-                                               return 
resourceManagerGateway.registerJobMaster(jobMasterRegistration, 
timeout).zip(Futures.successful(resourceManagerGateway));
-                                       }
-                               }, executionContext);
-
-                               registrationResponseFuture.onComplete(new 
OnComplete<Tuple2<RegistrationResponse, ResourceManagerGateway>>() {
-                                       @Override
-                                       public void onComplete(Throwable 
failure, Tuple2<RegistrationResponse, ResourceManagerGateway> tuple) throws 
Throwable {
-                                               if (failure != null) {
-                                                       if (failure instanceof 
TimeoutException) {
-                                                               // we haven't 
received an answer in the given timeout interval,
-                                                               // so increase 
it and try again.
-                                                               final 
FiniteDuration newTimeout = timeout.$times(2L).min(maxTimeout);
-
-                                                               
handleResourceManagerRegistration(
-                                                                       
jobMasterRegistration,
-                                                                       
attemptNumber + 1,
-                                                                       
resourceManagerFuture,
-                                                                       
registrationRun,
-                                                                       
newTimeout,
-                                                                       
maxTimeout,
-                                                                       
deadline);
-                                                       } else {
-                                                               
log.error("Received unknown error while registering at the ResourceManager.", 
failure);
-                                                               shutDown();
-                                                       }
-                                               } else {
-                                                       final 
RegistrationResponse response = tuple._1();
-                                                       final 
ResourceManagerGateway gateway = tuple._2();
-
-                                                       if 
(response.isSuccess()) {
-                                                               
finishResourceManagerRegistration(gateway, response.getInstanceID());
-                                                       } else {
-                                                               log.info("The 
registration was refused. Try again.");
-
-                                                               
scheduledExecutorService.schedule(new Runnable() {
-                                                                       
@Override
-                                                                       public 
void run() {
-                                                                               
// we have to execute scheduled runnable in the main thread
-                                                                               
// because we need consistency wrt currentRegistrationRun
-                                                                               
runAsync(new Runnable() {
-                                                                               
        @Override
-                                                                               
        public void run() {
-                                                                               
                // our registration attempt was refused. Start over.
-                                                                               
                handleResourceManagerRegistration(
-                                                                               
                        jobMasterRegistration,
-                                                                               
                        1,
-                                                                               
                        resourceManagerFuture,
-                                                                               
                        registrationRun,
-                                                                               
                        initialRegistrationTimeout,
-                                                                               
                        maxTimeout,
-                                                                               
                        deadline);
-                                                                               
        }
-                                                                               
});
-                                                                       }
-                                                               }, 
failedRegistrationDelay, TimeUnit.MILLISECONDS);
-                                                       }
-                                               }
-                                       }
-                               }, getMainThreadExecutionContext()); // use the 
main thread execution context to execute the call back in the main thread
-                       }
-               } else {
-                       log.info("Discard out-dated registration run.");
-               }
+       private void cancelAndClearEverything(Throwable cause) {
+               // currently, nothing to do here
        }
 
-       /**
-        * Finish the resource manager registration by setting the new resource 
manager gateway.
-        *
-        * @param resourceManager New resource manager gateway
-        * @param instanceID Instance id assigned by the resource manager
-        */
-       void finishResourceManagerRegistration(ResourceManagerGateway 
resourceManager, InstanceID instanceID) {
-               log.info("Successfully registered at the ResourceManager under 
instance id {}.", instanceID);
-               this.resourceManager = resourceManager;
-       }
+       // 
------------------------------------------------------------------------
+       //  Utility classes
+       // 
------------------------------------------------------------------------
+       private class JobMasterLeaderContender implements LeaderContender {
 
-       /**
-        * Return if the job master is connected to a resource manager.
-        *
-        * @return true if the job master is connected to the resource manager
-        */
-       public boolean isConnected() {
-               return resourceManager != null;
+               @Override
+               public void grantLeadership(UUID leaderSessionID) {
+                       
JobMaster.this.grantJobMasterLeadership(leaderSessionID);
+               }
+
+               @Override
+               public void revokeLeadership() {
+                       JobMaster.this.revokeJobMasterLeadership();
+               }
+
+               @Override
+               public String getAddress() {
+                       return JobMaster.this.getAddress();
+               }
+
+               @Override
+               public void handleError(Exception exception) {
+                       onJobMasterElectionError(exception);
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8fd8c998/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 7b4ab89..2790cf8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -20,9 +20,12 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorSystem;
 import akka.util.Timeout;
-
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.rpc.resourcemanager.ResourceManager;
@@ -31,6 +34,7 @@ import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import org.mockito.Mockito;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
@@ -80,51 +84,4 @@ public class AkkaRpcServiceTest extends TestLogger {
 
                assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
        }
-
-       // 
------------------------------------------------------------------------
-       //  specific component tests - should be moved to the test classes
-       //  for those components
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Tests that the {@link JobMaster} can connect to the {@link 
ResourceManager} using the
-        * {@link AkkaRpcService}.
-        */
-       @Test
-       public void testJobMasterResourceManagerRegistration() throws Exception 
{
-               Timeout akkaTimeout = new Timeout(10, TimeUnit.SECONDS);
-               ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-               ActorSystem actorSystem2 = AkkaUtils.createDefaultActorSystem();
-               AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, 
akkaTimeout);
-               AkkaRpcService akkaRpcService2 = new 
AkkaRpcService(actorSystem2, akkaTimeout);
-               ExecutorService executorService = new ForkJoinPool();
-
-               ResourceManager resourceManager = new 
ResourceManager(akkaRpcService, executorService);
-               JobMaster jobMaster = new JobMaster(akkaRpcService2, 
executorService);
-
-               resourceManager.start();
-               jobMaster.start();
-
-               ResourceManagerGateway rm = resourceManager.getSelf();
-
-               assertTrue(rm instanceof AkkaGateway);
-
-               AkkaGateway akkaClient = (AkkaGateway) rm;
-
-               
-               
jobMaster.registerAtResourceManager(AkkaUtils.getAkkaURL(actorSystem, 
akkaClient.getRpcEndpoint()));
-
-               // wait for successful registration
-               FiniteDuration timeout = new FiniteDuration(200, 
TimeUnit.SECONDS);
-               Deadline deadline = timeout.fromNow();
-
-               while (deadline.hasTimeLeft() && !jobMaster.isConnected()) {
-                       Thread.sleep(100);
-               }
-
-               assertFalse(deadline.isOverdue());
-
-               jobMaster.shutDown();
-               resourceManager.shutDown();
-       }
 }

Reply via email to