[FLINK-4537] rebase and refine

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b905fdbf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b905fdbf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b905fdbf

Branch: refs/heads/flip-6
Commit: b905fdbf07433158b8bf1c6d2b5a172482e3c418
Parents: 8b2271b
Author: Maximilian Michels <m...@apache.org>
Authored: Wed Sep 21 14:13:12 2016 +0200
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Sep 21 21:56:58 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/JobMasterRegistration.java  |  52 +++---
 .../resourcemanager/ResourceManager.java        | 165 ++++++++-----------
 .../slotmanager/SlotManager.java                |  29 +++-
 3 files changed, 110 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b905fdbf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
index 7b8ec70..981441f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/JobMasterRegistration.java
@@ -18,59 +18,47 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-<<<<<<< HEAD
 import org.apache.flink.api.common.JobID;
-=======
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 
-import java.io.Serializable;
 import java.util.UUID;
 
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
 /**
  * This class is responsible for group the JobMasterGateway and the 
LeaderSessionID of a registered job master
  */
-public class JobMasterRegistration implements Serializable {
+public class JobMasterRegistration implements LeaderRetrievalListener {
 
-<<<<<<< HEAD
-       private final String address;
+       private final JobMasterGateway gateway;
        private final JobID jobID;
+       private final UUID leaderSessionID;
+       private LeaderRetrievalListener retriever;
 
-       public JobMasterRegistration(String address, JobID jobID) {
-               this.address = address;
+       public JobMasterRegistration(JobMasterGateway gateway, JobID jobID, 
UUID leaderSessionID) {
+               this.gateway = gateway;
                this.jobID = jobID;
-=======
-       private static final long serialVersionUID = -2316627821716999527L;
-
-       private final JobMasterGateway jobMasterGateway;
-
-       private UUID jobMasterLeaderSessionID;
-
-       public JobMasterRegistration(JobMasterGateway jobMasterGateway) {
-               this.jobMasterGateway = checkNotNull(jobMasterGateway);
+               this.leaderSessionID = leaderSessionID;
        }
 
-       public JobMasterRegistration(JobMasterGateway jobMasterGateway, UUID 
jobMasterLeaderSessionID) {
-               this.jobMasterGateway = checkNotNull(jobMasterGateway);
-               this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
+       public JobMasterGateway getGateway() {
+               return gateway;
        }
 
-       public JobMasterGateway getJobMasterGateway() {
-               return jobMasterGateway;
+       public UUID getLeaderSessionID() {
+               return leaderSessionID;
        }
 
-       public void setJobMasterLeaderSessionID(UUID leaderSessionID) {
-               this.jobMasterLeaderSessionID = jobMasterLeaderSessionID;
->>>>>>> db98efb... rsourceManager registration with JobManager
+       public JobID getJobID() {
+               return jobID;
        }
 
-       public UUID getJobMasterLeaderSessionID() {
-               return jobMasterLeaderSessionID;
+       @Override
+       public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
+               
        }
 
-       public JobID getJobID() {
-               return jobID;
+       @Override
+       public void handleError(Exception exception) {
+
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b905fdbf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8be1455..aae4874 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.resourcemanager;
 
-import akka.dispatch.Futures;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
@@ -29,26 +28,31 @@ import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderContender;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-<<<<<<< HEAD
-import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
-=======
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
->>>>>>> db98efb... rsourceManager registration with JobManager
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.apache.flink.runtime.util.LeaderConnectionInfo;
+import org.apache.flink.runtime.util.LeaderRetrievalUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.concurrent.duration.FiniteDuration;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -62,17 +66,13 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *     <li>{@link #requestSlot(SlotRequest)} requests a slot from the resource 
manager</li>
  * </ul>
  */
-<<<<<<< HEAD
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> 
implements LeaderContender {
 
        private final Logger LOG = LoggerFactory.getLogger(getClass());
 
        private final Map<JobID, JobMasterGateway> jobMasterGateways;
-=======
-public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-       /** the mapping relationship of JobID and JobMasterGateway */
-       private final Map<JobID, JobMasterRegistration> jobMasters;
->>>>>>> db98efb... rsourceManager registration with JobManager
+
+       private final Set<LeaderRetrievalListener> 
jobMasterLeaderRetrievalListeners;
 
        private final HighAvailabilityServices highAvailabilityServices;
 
@@ -88,12 +88,9 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                        SlotManager slotManager) {
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
-<<<<<<< HEAD
                this.jobMasterGateways = new HashMap<>();
                this.slotManager = slotManager;
-=======
-               this.jobMasters = new HashMap<>(16);
->>>>>>> db98efb... rsourceManager registration with JobManager
+               this.jobMasterLeaderRetrievalListeners = new HashSet<>();
        }
 
        @Override
@@ -113,7 +110,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
        public void shutDown() {
                try {
                        leaderElectionService.stop();
-                       for(JobID jobID : jobMasters.keySet()) {
+                       for(JobID jobID : jobMasterGateways.keySet()) {
                                
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).stop();
                        }
                        super.shutDown();
@@ -142,52 +139,64 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
         * @return Future registration response
         */
        @RpcMethod
-<<<<<<< HEAD
-       public Future<RegistrationResponse> 
registerJobMaster(JobMasterRegistration jobMasterRegistration) {
-               final Future<JobMasterGateway> jobMasterFuture =
-                       
getRpcService().connect(jobMasterRegistration.getAddress(), 
JobMasterGateway.class);
-               final JobID jobID = jobMasterRegistration.getJobID();
-=======
-       public Future<RegistrationResponse> registerJobMaster(UUID 
resourceManagerLeaderId, final String jobMasterAddress, final JobID jobID) {
+       public Future<RegistrationResponse> registerJobMaster(
+               final UUID resourceManagerLeaderId, final UUID 
jobMasterLeaderId,
+               final String jobMasterAddress, final JobID jobID) {
+
+               checkNotNull(resourceManagerLeaderId);
+               checkNotNull(jobMasterAddress);
+               checkNotNull(jobID);
+
+               // TODO mxm The leader retrieval needs to be split up in an 
async part which runs outside the main execution thread
+               // The state updates should be performed inside the main thread
+
+               final FlinkCompletableFuture<RegistrationResponse> future = new 
FlinkCompletableFuture<>();
 
                if(!leaderSessionID.equals(resourceManagerLeaderId)) {
-                       log.warn("Discard registration from JobMaster {} at 
({}) because the expected leader session ID {} did not equal the received 
leader session ID  {}",
+                       log.warn("Discard registration from JobMaster {} at 
({}) because the expected leader session ID {}" +
+                                       " did not equal the received leader 
session ID  {}",
                                jobID, jobMasterAddress, leaderSessionID, 
resourceManagerLeaderId);
-                       return Futures.failed(new 
LeaderSessionIDException(leaderSessionID, resourceManagerLeaderId));
+                       future.complete(new 
RegistrationResponse.Decline("Invalid leader session id"));
+                       return future;
                }
 
-               Future<JobMasterGateway> jobMasterFuture = 
getRpcService().connect(jobMasterAddress, JobMasterGateway.class);
->>>>>>> db98efb... rsourceManager registration with JobManager
+               final LeaderConnectionInfo jobMasterLeaderInfo;
+               try {
+                       jobMasterLeaderInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
+                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
+               } catch (Exception e) {
+                       LOG.warn("Failed to start JobMasterLeaderRetriever for 
JobID {}", jobID);
+                       future.complete(new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+                       return future;
+               }
+
+               if 
(!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
+                       LOG.info("Declining registration request from 
non-leading JobManager {}", jobMasterAddress);
+                       future.complete(new 
RegistrationResponse.Decline("JobManager is not leading"));
+                       return future;
+               }
 
-               return jobMasterFuture.thenApplyAsync(new 
ApplyFunction<JobMasterGateway, RegistrationResponse>() {
+               Future<JobMasterGateway> jobMasterGatewayFuture =
+                       getRpcService().connect(jobMasterAddress, 
JobMasterGateway.class);
+
+               return jobMasterGatewayFuture.thenApplyAsync(new 
ApplyFunction<JobMasterGateway, RegistrationResponse>() {
                        @Override
                        public RegistrationResponse apply(JobMasterGateway 
jobMasterGateway) {
-<<<<<<< HEAD
+
+                               final JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
+                               try {
+                                       LeaderRetrievalService 
jobMasterLeaderRetriever = 
highAvailabilityServices.getJobMasterLeaderRetriever(jobID);
+                                       
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
+                               } catch (Exception e) {
+                                       LOG.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
+                                       return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
+                               }
+                               
jobMasterLeaderRetrievalListeners.add(jobMasterLeaderListener);
                                final JobMasterGateway existingGateway = 
jobMasterGateways.put(jobID, jobMasterGateway);
                                if (existingGateway != null) {
-                                       LOG.info("Replacing existing gateway {} 
for JobID {} with  {}.",
-                                               existingGateway, jobID, 
jobMasterGateway);
-                               }
-                               return new RegistrationResponse(true);
-=======
-                               if (jobMasters.containsKey(jobID)) {
-                                       JobMasterRegistration 
jobMasterRegistration = new JobMasterRegistration(jobMasterGateway, 
jobMasters.get(jobID).getJobMasterLeaderSessionID());
-                                       jobMasters.put(jobID, 
jobMasterRegistration);
                                        log.info("Replacing gateway for 
registered JobID {}.", jobID);
-                               } else {
-                                       JobMasterRegistration 
jobMasterRegistration = new JobMasterRegistration(jobMasterGateway);
-                                       jobMasters.put(jobID, 
jobMasterRegistration);
-                                       try {
-                                               
highAvailabilityServices.getJobMasterLeaderRetriever(jobID).start(new 
JobMasterLeaderListener(jobID));
-                                       } catch(Throwable e) {
-                                               log.warn("Decline registration 
from JobMaster {} at ({}) because fail to get the leader retriever for the 
given job JobMaster",
-                                                       jobID, 
jobMasterAddress);
-                                               return new 
RegistrationResponse.Decline("Fail to get the leader retriever for the given 
JobMaster");
-                                       }
                                }
-
                                return new JobMasterRegistrationSuccess(5000);
->>>>>>> db98efb... rsourceManager registration with JobManager
                        }
                }, getMainThreadExecutor());
        }
@@ -228,26 +237,9 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
        }
 
 
-<<<<<<< HEAD
        // 
------------------------------------------------------------------------
        //  Leader Contender
        // 
------------------------------------------------------------------------
-=======
-               /**
-                * Callback method when current resourceManager lose leadership.
-                */
-               @Override
-               public void revokeLeadership() {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
-                                       jobMasters.clear();
-                                       leaderSessionID = null;
-                               }
-                       });
-               }
->>>>>>> db98efb... rsourceManager registration with JobManager
 
        /**
         * Callback method when current resourceManager is granted leadership
@@ -263,7 +255,7 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                                // confirming the leader session ID might be 
blocking,
                                
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
                                // notify SlotManager
-                               slotManager.notifyLeaderAddress(getAddress(), 
leaderSessionID);
+                               slotManager.setLeaderUUID(leaderSessionID);
                                ResourceManager.this.leaderSessionID = 
leaderSessionID;
                        }
                });
@@ -279,7 +271,8 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
                        public void run() {
                                log.info("ResourceManager {} was revoked 
leadership.", getAddress());
                                jobMasterGateways.clear();
-                               ResourceManager.this.leaderSessionID = null;
+                               slotManager.clearState();
+                               leaderSessionID = null;
                        }
                });
        }
@@ -291,20 +284,15 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
         */
        @Override
        public void handleError(final Exception exception) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.error("ResourceManager received an error 
from the LeaderElectionService.", exception);
-                               // notify SlotManager
-                               slotManager.handleError(exception);
-                               // terminate ResourceManager in case of an error
-                               shutDown();
-                       }
-               });
+               log.error("ResourceManager received an error from the 
LeaderElectionService.", exception);
+               // terminate ResourceManager in case of an error
+               shutDown();
        }
 
-       private class JobMasterLeaderListener implements 
LeaderRetrievalListener {
+       private static class JobMasterLeaderListener implements 
LeaderRetrievalListener {
+
                private final JobID jobID;
+               private UUID leaderID;
 
                private JobMasterLeaderListener(JobID jobID) {
                        this.jobID = jobID;
@@ -312,25 +300,12 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
 
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.info("A new leader for JobMaster {} 
is elected, address is {}, leaderSessionID is {}", jobID, leaderAddress, 
leaderSessionID);
-                                       // update job master leader session id
-                                       JobMasterRegistration 
jobMasterRegistration = jobMasters.get(jobID);
-                                       
jobMasterRegistration.setJobMasterLeaderSessionID(leaderSessionID);
-                               }
-                       });
+                       this.leaderID = leaderSessionID;
                }
 
                @Override
                public void handleError(final Exception exception) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       log.error("JobMasterLeaderListener 
received an error from the LeaderRetrievalService", exception);
-                               }
-                       });
+                       // TODO
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b905fdbf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 97176b2..5d0013c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -59,7 +59,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * </ul>
  * <b>IMPORTANT:</b> This class is <b>Not Thread-safe</b>.
  */
-public abstract class SlotManager implements LeaderRetrievalListener {
+public abstract class SlotManager {
 
        protected final Logger LOG = LoggerFactory.getLogger(getClass());
 
@@ -514,22 +514,33 @@ public abstract class SlotManager implements 
LeaderRetrievalListener {
                public int size() {
                        return allocatedSlots.size();
                }
+
+               public void clear() {
+                       allocatedSlots.clear();
+                       allocatedSlotsByAllocationId.clear();
+               }
+       }
+
+       /**
+        * Clears the state of the SlotManager after leadership revokal
+        */
+       public void clearState() {
+               taskManagerGateways.clear();
+               registeredSlots.clear();
+               pendingSlotRequests.clear();
+               freeSlots.clear();
+               allocationMap.clear();
+               leaderID = null;
        }
 
        // 
------------------------------------------------------------------------
-       //  High availability
+       //  High availability (called by the ResourceManager)
        // 
------------------------------------------------------------------------
 
-       @Override
-       public void notifyLeaderAddress(String leaderAddress, UUID 
leaderSessionID) {
+       public void setLeaderUUID(UUID leaderSessionID) {
                this.leaderID = leaderSessionID;
        }
 
-       @Override
-       public void handleError(Exception exception) {
-               LOG.error("Slot Manager received an error from the leader 
service", exception);
-       }
-
        // 
------------------------------------------------------------------------
        //  Testing utilities
        // 
------------------------------------------------------------------------

Reply via email to