Repository: flink
Updated Branches:
  refs/heads/flip-6 b5f6a06b0 -> 4076bd748


[FLINK-4537] [cluster management] ResourceManager registration with JobManager


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8b2271bd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8b2271bd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8b2271bd

Branch: refs/heads/flip-6
Commit: 8b2271bdd8197e4ca051bc4c9c3d90de88c986ce
Parents: b5f6a06
Author: beyond1920 <beyond1...@126.com>
Authored: Thu Sep 1 15:27:20 2016 +0800
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Sep 21 21:53:16 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   9 ++
 .../runtime/highavailability/NonHaServices.java |  19 +++
 .../jobmaster/JobMasterRegistrationSuccess.java |  49 ++++++
 .../resourcemanager/JobMasterRegistration.java  |  39 ++++-
 .../resourcemanager/ResourceManager.java        | 125 +++++++++++++--
 .../resourcemanager/ResourceManagerGateway.java |  34 ++--
 .../exceptions/LeaderSessionIDException.java    |  60 +++++++
 .../runtime/taskexecutor/TaskExecutor.java      |   5 +
 .../TestingHighAvailabilityServices.java        |  17 ++
 .../resourcemanager/ResourceManagerTest.java    | 160 +++++++++++++++++++
 10 files changed, 483 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 298147c..7634176 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,15 @@ public interface HighAvailabilityServices {
        LeaderRetrievalService getResourceManagerLeaderRetriever() throws 
Exception;
 
        /**
+        * Gets the leader retriever for the job JobMaster which is responsible 
for the given job
+        *
+        * @param jobID The identifier of the job.
+        * @return
+        * @throws Exception
+        */
+       LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) throws 
Exception;
+
+       /**
         * Gets the leader election service for the cluster's resource manager.
         * @return
         * @throws Exception

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 292a404..33dc2d7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -42,6 +43,8 @@ public class NonHaServices implements 
HighAvailabilityServices {
        /** The fix address of the ResourceManager */
        private final String resourceManagerAddress;
 
+       private final ConcurrentHashMap<JobID, String> jobMastersAddress;
+
        /**
         * Creates a new services class for the fix pre-defined leaders.
         * 
@@ -49,6 +52,17 @@ public class NonHaServices implements 
HighAvailabilityServices {
         */
        public NonHaServices(String resourceManagerAddress) {
                this.resourceManagerAddress = 
checkNotNull(resourceManagerAddress);
+               this.jobMastersAddress = new ConcurrentHashMap<>(16);
+       }
+
+       /**
+        * Binds address of a specified job master
+        *
+        * @param jobID            JobID for the specified job master
+        * @param jobMasterAddress address for the specified job master
+        */
+       public void bindJobMasterLeaderAddress(JobID jobID, String 
jobMasterAddress) {
+               jobMastersAddress.put(jobID, jobMasterAddress);
        }
 
        // 
------------------------------------------------------------------------
@@ -61,6 +75,11 @@ public class NonHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
+       public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) 
throws Exception {
+               return new 
StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
+       }
+
+       @Override
        public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
                return new StandaloneLeaderElectionService();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
new file mode 100644
index 0000000..031c38e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.registration.RegistrationResponse;
+
+/**
+ * Base class for responses from the ResourceManager to a registration attempt 
by a JobMaster.
+ */
+public class JobMasterRegistrationSuccess extends RegistrationResponse.Success 
{
+
+       private static final long serialVersionUID = 5577641250204140415L;
+
+       private final long heartbeatInterval;
+
+       public JobMasterRegistrationSuccess(long heartbeatInterval) {
+               this.heartbeatInterval = heartbeatInterval;
+       }
+
+       /**
+        * Gets the interval in which the ResourceManager will heartbeat the 
JobMaster.
+        *
+        * @return the interval in which the ResourceManager will heartbeat the 
JobMaster
+        */
+       public long getHeartbeatInterval() {
+               return heartbeatInterval;
+       }
+
+       @Override
+       public String toString() {
+               return "JobMasterRegistrationSuccess(" + heartbeatInterval + 
')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 439e56b..7b8ec70 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,23 +18,56 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+<<<<<<< HEAD
 import org.apache.flink.api.common.JobID;
+=======
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
 import java.io.Serializable;
+import java.util.UUID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This class is responsible for group the JobMasterGateway and the 
LeaderSessionID of a registered job master
+ */
 public class JobMasterRegistration implements Serializable {
-       private static final long serialVersionUID = 8411214999193765202L;
 
+<<<<<<< HEAD
        private final String address;
        private final JobID jobID;
 
        public JobMasterRegistration(String address, JobID jobID) {
                this.address = address;
                this.jobID = jobID;
+=======
+       private static final long serialVersionUID = -2316627821716999527L;
+
+       private final JobMasterGateway jobMasterGateway;
+
+       private UUID jobMasterLeaderSessionID;
+
+       public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
+               this.jobMasterGateway = checkNotNull(jobMasterGateway);
+       }
+
+       public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID 
jobMasterLeaderSessionID) {
+               this.jobMasterGateway = checkNotNull(jobMasterGateway);
+               this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+       }
+
+       public JobMasterGateway getJobMasterGateway() {
+               return jobMasterGateway;
+       }
+
+       public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
+               this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+>>>>>>> db98efb... rsourceManager registration with JobManager
        }
 
-       public String getAddress() {
-               return address;
+       public UUID getJobMasterLeaderSessionID() {
+               return jobMasterLeaderSessionID;
        }
 
        public JobID getJobID() {

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 5370710..8be1455 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
+import akka.dispatch.Futures;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -25,15 +26,22 @@ import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
+<<<<<<< HEAD
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+=======
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -50,15 +58,21 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * It offers the following methods as part of its rpc interface to interact 
with the him remotely:
  * <ul>
- *     <li>{@link #registerJobMaster(JobMasterRegistration)} registers a 
{@link JobMaster} at the resource manager</li>
+ *     <li>{@link #registerJobMaster(UUID, String, JobID)} registers a {@link 
JobMaster} at the resource manager</li>
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
  * </ul>
  */
+<<<<<<< HEAD
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> 
implements LeaderContender {
 
        private final Logger LOG = LoggerFactory.getLogger(getClass());
 
        private final Map<JobID, JobMasterGateway> jobMasterGateways;
+=======
+public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
+       /** the mapping relationship of JobID and JobMasterGateway */
+       private final Map<JobID, JobMasterRegistration> jobMasters;
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
        private final HighAvailabilityServices highAvailabilityServices;
 
@@ -74,8 +88,12 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                        SlotManager slotManager) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
+<<<<<<< HEAD
                this.jobMasterGateways = new HashMap<>();
                this.slotManager = slotManager;
+=======
+               this.jobMasters = new HashMap<>(16);
+>>>>>>> db98efb... rsourceManager registration with JobManager
        }
 
        @Override
@@ -95,8 +113,11 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
        public void shutDown() {
                try {
                        leaderElectionService.stop();
+                       for(JobID jobID : jobMasters.keySet()) {
+                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
+                       }
                        super.shutDown();
-               } catch(Throwable e) {
+               } catch (Throwable e) {
                        log.error("A fatal error happened when shutdown the 
ResourceManager", e);
                        throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
                }
@@ -115,24 +136,58 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param jobMasterRegistration Job master registration information
+        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
+        * @param jobMasterAddress        The address of the JobMaster that 
registers
+        * @param jobID                   The Job ID of the JobMaster that 
registers
         * @return Future registration response
         */
        @RpcMethod
+<<<<<<< HEAD
        public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
                final Future<JobMasterGateway> jobMasterFuture =
                        
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
                final JobID jobID = jobMasterRegistration.getJobID();
+=======
+       public Future<RegistrationResponse> registerJobMaster(UUID 
resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+
+               if(!leaderSessionID.equals(resourceManagerLeaderId)) {
+                       log.warn("Discard registration from JobMaster {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+                               jobID, jobMasterAddress, leaderSessionID, 
resourceManagerLeaderId);
+                       return Futures.failed(new 
LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+               }
+
+               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
                return jobMasterFuture.thenApplyAsync(new 
ApplyFunction<JobMasterGateway, RegistrationResponse>() {
                        @Override
                        public RegistrationResponse apply(JobMasterGateway 
jobMasterGateway) {
+<<<<<<< HEAD
                                final JobMasterGateway existingGateway = 
jobMasterGateways.put(jobID, jobMasterGateway);
                                if (existingGateway != null) {
                                        LOG.info("Replacing existing gateway {} 
for JobID {} with  {}.",
                                                existingGateway, jobID, 
jobMasterGateway);
                                }
                                return new RegistrationResponse(true);
+=======
+                               if (jobMasters.containsKey(jobID)) {
+                                       JobMasterRegistration 
jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, 
jobMasters.get(jobID).getJobMasterLeaderSessionID());
+                                       jobMasters.put(jobID, 
jobMasterRegistration);
+                                       log.info("Replacing gateway for 
registered JobID {}.", jobID);
+                               } else {
+                                       JobMasterRegistration 
jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
+                                       jobMasters.put(jobID, 
jobMasterRegistration);
+                                       try {
+                                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new 
JobMasterLeaderListener(jobID));
+                                       } catch(Throwable e) {
+                                               log.warn("Decline registration 
from JobMaster {} at ({}) because fail to get the leader retriever for the 
given job JobMaster",
+                                                       jobID, 
jobMasterAddress);
+                                               return new 
RegistrationResponse.Decline("Fail to get the leader retriever for the given 
JobMaster");
+                                       }
+                               }
+
+                               return new JobMasterRegistrationSuccess(5000);
+>>>>>>> db98efb... rsourceManager registration with JobManager
                        }
                }, getMainThreadExecutor());
        }
@@ -158,26 +213,41 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
 
 
        /**
-        *
-        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
-        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param resourceID               The resource ID of the TaskExecutor 
that registers
-        *
+        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
+        * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
+        * @param resourceID              The resource ID of the TaskExecutor 
that registers
         * @return The response by the ResourceManager.
         */
        @RpcMethod
-       public org.apache.flink.runtime.registration.RegistrationResponse 
registerTaskExecutor(
-                       UUID resourceManagerLeaderId,
-                       String taskExecutorAddress,
-                       ResourceID resourceID) {
+       public RegistrationResponse registerTaskExecutor(
+               UUID resourceManagerLeaderId,
+               String taskExecutorAddress,
+               ResourceID resourceID) {
 
                return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
        }
 
 
+<<<<<<< HEAD
        // 
------------------------------------------------------------------------
        //  Leader Contender
        // 
------------------------------------------------------------------------
+=======
+               /**
+                * Callback method when current resourceManager lose leadership.
+                */
+               @Override
+               public void revokeLeadership() {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
+                                       jobMasters.clear();
+                                       leaderSessionID = null;
+                               }
+                       });
+               }
+>>>>>>> db98efb... rsourceManager registration with JobManager
 
        /**
         * Callback method when current resourceManager is granted leadership
@@ -232,4 +302,35 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> impleme
                        }
                });
        }
+
+       private class JobMasterLeaderListener implements 
LeaderRetrievalListener {
+               private final JobID jobID;
+
+               private JobMasterLeaderListener(JobID jobID) {
+                       this.jobID = jobID;
+               }
+
+               @Override
+               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("A new leader for JobMaster {} 
is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, 
leaderSessionID);
+                                       // update job master leader session id
+                                       JobMasterRegistration 
jobMasterRegistration = jobMasters.get(jobID);
+                                       
jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
+                               }
+                       });
+               }
+
+               @Override
+               public void handleError(final Exception exception) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.error("JobMasterLeaderListener 
received an error from the LeaderRetrievalService", exception);
+                               }
+                       });
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 5c8786c..1ee11a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -18,12 +18,13 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 
 import java.util.UUID;
 
@@ -35,21 +36,18 @@ public interface ResourceManagerGateway extends RpcGateway {
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param jobMasterRegistration Job master registration information
-        * @param timeout Timeout for the future to complete
+        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
+        * @param jobMasterAddress        The address of the JobMaster that 
registers
+        * @param jobID                   The Job ID of the JobMaster that 
registers
+        * @param timeout                 Timeout for the future to complete
         * @return Future registration response
         */
        Future<RegistrationResponse> registerJobMaster(
-               JobMasterRegistration jobMasterRegistration,
-               @RpcTimeout Time timeout);
+               UUID resourceManagerLeaderId,
+               String jobMasterAddress,
+               JobID jobID,
+                               @RpcTimeout Time timeout);
 
-       /**
-        * Register a {@link JobMaster} at the resource manager.
-        *
-        * @param jobMasterRegistration Job master registration information
-        * @return Future registration response
-        */
-       Future<RegistrationResponse> registerJobMaster(JobMasterRegistration 
jobMasterRegistration);
 
        /**
         * Requests a slot from the resource manager.
@@ -60,15 +58,13 @@ public interface ResourceManagerGateway extends RpcGateway {
        Future<SlotRequestReply> requestSlot(SlotRequest slotRequest);
 
        /**
-        * 
-        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader 
-        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param resourceID               The resource ID of the TaskExecutor 
that registers
-        * @param timeout                  The timeout for the response.
-        * 
+        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
+        * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
+        * @param resourceID              The resource ID of the TaskExecutor 
that registers
+        * @param timeout                 The timeout for the response.
         * @return The future to the response by the ResourceManager.
         */
-       Future<org.apache.flink.runtime.registration.RegistrationResponse> 
registerTaskExecutor(
+       Future<RegistrationResponse> registerTaskExecutor(
                        UUID resourceManagerLeaderId,
                        String taskExecutorAddress,
                        ResourceID resourceID,

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
new file mode 100644
index 0000000..cd14a0d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/LeaderSessionIDException.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * An exception specifying that the received leader session ID is not the same 
as expected.
+ */
+public class LeaderSessionIDException extends Exception {
+
+       private static final long serialVersionUID = -3276145308053264636L;
+
+       /** expected leader session id */
+       private final UUID expectedLeaderSessionID;
+
+       /** actual leader session id */
+       private final UUID actualLeaderSessionID;
+
+       public LeaderSessionIDException(UUID expectedLeaderSessionID, UUID 
actualLeaderSessionID) {
+               super("Unmatched leader session ID : expected " + 
expectedLeaderSessionID + ", actual " + actualLeaderSessionID);
+               this.expectedLeaderSessionID =  
checkNotNull(expectedLeaderSessionID);
+               this.actualLeaderSessionID = 
checkNotNull(actualLeaderSessionID);
+       }
+
+       /**
+        * Get expected leader session id
+        *
+        * @return expect leader session id
+        */
+       public UUID getExpectedLeaderSessionID() {
+               return expectedLeaderSessionID;
+       }
+
+       /**
+        * Get actual leader session id
+        *
+        * @return actual leader session id
+        */
+       public UUID getActualLeaderSessionID() {
+               return actualLeaderSessionID;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index d84a6a9..cf709c8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -327,6 +327,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        }
 
                        @Override
+                       public LeaderRetrievalService 
getJobMasterLeaderRetriever(JobID jobID) throws Exception {
+                               return null;
+                       }
+
+                       @Override
                        public LeaderElectionService 
getResourceManagerLeaderElectionService() throws Exception {
                                return null;
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 3162f40..2ac43be 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.highavailability;
 
 import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.hadoop.shaded.org.jboss.netty.util.internal.ConcurrentHashMap;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 
@@ -30,6 +31,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
        private volatile LeaderRetrievalService resourceManagerLeaderRetriever;
 
+       private ConcurrentHashMap<JobID, LeaderRetrievalService> 
jobMasterLeaderRetrievers = new ConcurrentHashMap<>();
+
        private volatile LeaderElectionService jobMasterLeaderElectionService;
 
        private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
@@ -43,6 +46,10 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                this.resourceManagerLeaderRetriever = 
resourceManagerLeaderRetriever;
        }
 
+       public void setJobMasterLeaderRetriever(JobID jobID, 
LeaderRetrievalService jobMasterLeaderRetriever) {
+               this.jobMasterLeaderRetrievers.put(jobID, 
jobMasterLeaderRetriever);
+       }
+
        public void setJobMasterLeaderElectionService(LeaderElectionService 
leaderElectionService) {
                this.jobMasterLeaderElectionService = leaderElectionService;
        }
@@ -66,6 +73,16 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
+       public LeaderRetrievalService getJobMasterLeaderRetriever(JobID jobID) 
throws Exception {
+               LeaderRetrievalService service = 
this.jobMasterLeaderRetrievers.get(jobID);
+               if (service != null) {
+                       return service;
+               } else {
+                       throw new 
IllegalStateException("JobMasterLeaderRetriever has not been set");
+               }
+       }
+
+       @Override
        public LeaderElectionService getJobMasterLeaderElectionService(JobID 
jobID) throws Exception {
                LeaderElectionService service = jobMasterLeaderElectionService;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8b2271bd/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
new file mode 100644
index 0000000..4d04001
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.api.common.JobID;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.*;
+
+public class ResourceManagerTest {
+
+       private TestingSerialRpcService rpcService;
+
+       @Before
+       public void setup() throws Exception {
+               rpcService = new TestingSerialRpcService();
+       }
+
+       @After
+       public void teardown() throws Exception {
+               rpcService.stopService();
+       }
+
+       /**
+        * Test receive normal registration from job master and receive 
duplicate registration from job master
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testRegisterJobMaster() throws Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test response successful
+               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, jobID);
+               RegistrationResponse response = Await.result(successfulFuture, 
new FiniteDuration(0, TimeUnit.SECONDS));
+               assertTrue(response instanceof JobMasterRegistrationSuccess);
+       }
+
+       /**
+        * Test receive registration with unmatched leadershipId from job master
+        *
+        * @throws Exception
+        */
+       @Test(expected = LeaderSessionIDException.class)
+       public void testRegisterJobMasterWithUnmatchedLeaderSessionId() throws 
Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
+               UUID differentLeaderSessionID = UUID.randomUUID();
+               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerJobMaster(differentLeaderSessionID, jobMasterAddress, 
jobID);
+               Await.result(unMatchedLeaderFuture, new FiniteDuration(200, 
TimeUnit.MILLISECONDS));
+       }
+
+       /**
+        * Test receive registration with invalid address from job master
+        *
+        * @throws Exception
+        */
+       @Test(expected = Exception.class)
+       public void testRegisterJobMasterFromInvalidAddress() throws Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               // test throw exception when receive a registration from job 
master which takes invalid address
+               String invalidAddress = "/jobMasterAddress2";
+               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerJobMaster(leaderSessionId, invalidAddress, jobID);
+               Await.result(invalidAddressFuture, new FiniteDuration(200, 
TimeUnit.MILLISECONDS));
+       }
+
+       /**
+        * Check and verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testRegisterJobMasterWithFailureLeaderListener() throws 
Exception {
+               String jobMasterAddress = "/jobMasterAddress1";
+               JobID jobID = mockJobMaster(jobMasterAddress);
+               TestingLeaderElectionService 
resourceManagerLeaderElectionService = new TestingLeaderElectionService();
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService();
+               final ResourceManager resourceManager = 
createAndStartResourceManager(resourceManagerLeaderElectionService, jobID, 
jobMasterLeaderRetrievalService);
+               final UUID leaderSessionId = 
grantResourceManagerLeadership(resourceManagerLeaderElectionService);
+
+               JobID unknownJobIDToHAServices = new JobID();
+               // verify return RegistrationResponse.Decline when failed to 
start a job master Leader retrieval listener
+               Future<RegistrationResponse> declineFuture = 
resourceManager.registerJobMaster(leaderSessionId, jobMasterAddress, 
unknownJobIDToHAServices);
+               RegistrationResponse response = Await.result(declineFuture, new 
FiniteDuration(0, TimeUnit.SECONDS));
+               assertTrue(response instanceof RegistrationResponse.Decline);
+       }
+
+       private JobID mockJobMaster(String jobMasterAddress) {
+               JobID jobID = new JobID();
+               JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+               rpcService.registerGateway(jobMasterAddress, jobMasterGateway);
+               return jobID;
+       }
+
+       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
resourceManagerLeaderElectionService, JobID jobID, 
TestingLeaderRetrievalService jobMasterLeaderRetrievalService) {
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
+               highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
+               ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
+               resourceManager.start();
+               return resourceManager;
+       }
+
+       private UUID 
grantResourceManagerLeadership(TestingLeaderElectionService 
resourceManagerLeaderElectionService) {
+               UUID leaderSessionId = UUID.randomUUID();
+               resourceManagerLeaderElectionService.isLeader(leaderSessionId);
+               return leaderSessionId;
+       }
+
+}

Reply via email to