[FLINK-4516] update leadership information in ResourceManager

The leadership information remained static for connected
JobMasters. This updates it to remove stale JobMasters when they lose
leadership status.

This closes #2624


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cef31912
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cef31912
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cef31912

Branch: refs/heads/flip-6
Commit: cef319126ced676b5c6d08e6a963986f1dd6c5ee
Parents: b380634
Author: Maximilian Michels <m...@apache.org>
Authored: Mon Oct 10 17:36:10 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:43 2016 +0200

----------------------------------------------------------------------
 .../resourcemanager/ResourceManager.java        | 196 +++++++++++++------
 .../resourcemanager/ResourceManagerGateway.java |   4 +-
 .../ResourceManagerServices.java                |   6 +
 .../registration/JobMasterRegistration.java     |  62 ++++++
 .../slotmanager/SlotManager.java                |  16 +-
 .../resourcemanager/TestingSlotManager.java     |   8 +
 .../slotmanager/SlotManagerTest.java            |  10 +-
 7 files changed, 224 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index d2d00cf..8fbb34b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
@@ -40,6 +41,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
+import 
org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
 import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
@@ -53,17 +55,14 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.runtime.util.LeaderConnectionInfo;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.io.Serializable;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -85,10 +84,10 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        protected static final int EXIT_CODE_FATAL_ERROR = -13;
 
        /** All currently registered JobMasterGateways scoped by JobID. */
-       private final Map<JobID, JobMasterGateway> jobMasterGateways;
+       private final Map<JobID, JobMasterRegistration> jobMasters;
 
-       /** LeaderListeners for all registered JobMasters. */
-       private final Map<JobID, JobMasterLeaderListener> 
jobMasterLeaderRetrievalListeners;
+       /** LeaderListeners for all registered JobIDs. */
+       private final Map<JobID, JobIdLeaderListener> leaderListeners;
 
        /** All currently registered TaskExecutors with there framework 
specific worker information. */
        private final Map<ResourceID, WorkerRegistration<WorkerType>> 
taskExecutors;
@@ -106,7 +105,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private LeaderElectionService leaderElectionService;
 
        /** ResourceManager's leader session id which is updated on leader 
election. */
-       private UUID leaderSessionID;
+       private volatile UUID leaderSessionID;
 
        /** All registered listeners for status updates of the ResourceManager. 
*/
        private Map<String, InfoMessageListenerRpcGateway> infoMessageListeners;
@@ -121,8 +120,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                super(rpcService);
                this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.slotManagerFactory = checkNotNull(slotManagerFactory);
-               this.jobMasterGateways = new HashMap<>();
-               this.jobMasterLeaderRetrievalListeners = new HashMap<>();
+               this.jobMasters = new HashMap<>();
+               this.leaderListeners = new HashMap<>();
                this.taskExecutors = new HashMap<>();
                this.leaderSessionID = new UUID(0, 0);
                infoMessageListeners = new HashMap<>();
@@ -149,9 +148,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        public void shutDown() {
                try {
                        leaderElectionService.stop();
-                       for (JobID jobID : jobMasterGateways.keySet()) {
-                               
highAvailabilityServices.getJobManagerLeaderRetriever(jobID).stop();
-                       }
+                       clearState();
                        super.shutDown();
                } catch (Throwable e) {
                        log.error("A fatal error happened when shutdown the 
ResourceManager", e);
@@ -185,6 +182,24 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                checkNotNull(jobMasterAddress);
                checkNotNull(jobID);
 
+               // create a leader retriever in case it doesn't exist
+               final JobIdLeaderListener jobIdLeaderListener;
+               if (leaderListeners.containsKey(jobID)) {
+                       jobIdLeaderListener = leaderListeners.get(jobID);
+               } else {
+                       try {
+                               LeaderRetrievalService jobMasterLeaderRetriever 
=
+                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
+                               jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
+                       } catch (Exception e) {
+                               log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
+                               FlinkCompletableFuture<RegistrationResponse> 
responseFuture = new FlinkCompletableFuture<>();
+                               responseFuture.complete(new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever"));
+                               return responseFuture;
+                       }
+                       leaderListeners.put(jobID, jobIdLeaderListener);
+               }
+
                return getRpcService()
                        .execute(new Callable<JobMasterGateway>() {
                                @Override
@@ -197,21 +212,13 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                                throw new Exception("Invalid 
leader session id");
                                        }
 
-                                       final LeaderConnectionInfo 
jobMasterLeaderInfo;
-                                       try {
-                                               jobMasterLeaderInfo = 
LeaderRetrievalUtils.retrieveLeaderConnectionInfo(
-                                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID), new 
FiniteDuration(5, TimeUnit.SECONDS));
-                                       } catch (Exception e) {
-                                               log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);
-                                               throw new Exception("Failed to 
retrieve JobMasterLeaderRetriever");
-                                       }
-
-                                       if 
(!jobMasterLeaderId.equals(jobMasterLeaderInfo.getLeaderSessionID())) {
-                                               log.info("Declining 
registration request from non-leading JobManager {}", jobMasterAddress);
-                                               throw new Exception("JobManager 
is not leading");
+                                       if 
(!jobIdLeaderListener.getLeaderID().get(timeout.getSize(), timeout.getUnit())
+                                                       
.equals(jobMasterLeaderId)) {
+                                               throw new Exception("Leader Id 
did not match");
                                        }
 
-                                       return 
getRpcService().connect(jobMasterAddress, JobMasterGateway.class).get(5, 
TimeUnit.SECONDS);
+                                       return 
getRpcService().connect(jobMasterAddress, JobMasterGateway.class)
+                                               .get(timeout.getSize(), 
timeout.getUnit());
                                }
                        })
                        .handleAsync(new BiFunction<JobMasterGateway, 
Throwable, RegistrationResponse>() {
@@ -220,24 +227,34 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                                        if (throwable != null) {
                                                return new 
RegistrationResponse.Decline(throwable.getMessage());
-                                       } else {
-                                               if 
(!jobMasterLeaderRetrievalListeners.containsKey(jobID)) {
-                                                       JobMasterLeaderListener 
jobMasterLeaderListener = new JobMasterLeaderListener(jobID);
-                                                       try {
-                                                               
LeaderRetrievalService jobMasterLeaderRetriever = 
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
-                                                               
jobMasterLeaderRetriever.start(jobMasterLeaderListener);
-                                                       } catch (Exception e) {
-                                                               
log.warn("Failed to start JobMasterLeaderRetriever for JobID {}", jobID);
-                                                               return new 
RegistrationResponse.Decline("Failed to retrieve JobMasterLeaderRetriever");
-                                                       }
-                                                       
jobMasterLeaderRetrievalListeners.put(jobID, jobMasterLeaderListener);
-                                               }
-                                               final JobMasterGateway 
existingGateway = jobMasterGateways.put(jobID, jobMasterGateway);
-                                               if (existingGateway != null) {
-                                                       log.info("Replacing 
gateway for registered JobID {}.", jobID);
+                                       }
+
+                                       if 
(!leaderSessionID.equals(resourceManagerLeaderId)) {
+                                               log.warn("Discard registration 
from JobMaster {} at ({}) because the expected leader session ID {}" +
+                                                               " did not equal 
the received leader session ID  {}",
+                                                       jobID, 
jobMasterAddress, leaderSessionID, resourceManagerLeaderId);
+                                               return new 
RegistrationResponse.Decline("Invalid leader session id");
+                                       }
+
+                                       try {
+                                               // LeaderID should be available 
now, but if not we fail the registration
+                                               UUID currentJobMasterLeaderId = 
jobIdLeaderListener.getLeaderID().getNow(null);
+                                               if (currentJobMasterLeaderId == 
null || !currentJobMasterLeaderId.equals(jobMasterLeaderId)) {
+                                                       throw new 
Exception("Leader Id did not match");
                                                }
-                                               return new 
JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
+                                       } catch (Exception e) {
+                                               return new 
RegistrationResponse.Decline(e.getMessage());
+                                       }
+
+                                       final JobMasterRegistration 
registration =
+                                               new 
JobMasterRegistration(jobID, jobMasterLeaderId, jobMasterGateway);
+
+                                       final JobMasterRegistration 
existingRegistration = jobMasters.put(jobID, registration);
+                                       if (existingRegistration != null) {
+                                               log.info("Replacing JobMaster 
registration for newly registered JobMaster with JobID {}.", jobID);
                                        }
+                                       return new 
JobMasterRegistrationSuccess(5000, resourceManagerLeaderId);
+
                                }
                        }, getMainThreadExecutor());
        }
@@ -305,13 +322,10 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        SlotRequest slotRequest) {
 
                JobID jobId = slotRequest.getJobId();
-               JobMasterGateway jobMasterGateway = 
jobMasterGateways.get(jobId);
-               JobMasterLeaderListener jobMasterLeaderListener = 
jobMasterLeaderRetrievalListeners.get(jobId);
+               JobMasterRegistration jobMasterRegistration = 
jobMasters.get(jobId);
 
-               UUID leaderID = jobMasterLeaderListener.getLeaderID();
-
-               if (jobMasterGateway != null
-                               && jobMasterLeaderID.equals(leaderID)
+               if (jobMasterRegistration != null
+                               && 
jobMasterLeaderID.equals(jobMasterRegistration.getLeaderID())
                                && 
resourceManagerLeaderID.equals(leaderSessionID)) {
                        return slotManager.requestSlot(slotRequest);
                } else {
@@ -371,8 +385,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                                log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), leaderSessionID);
                                // confirming the leader session ID might be 
blocking,
                                
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
-                               // notify SlotManager
-                               slotManager.setLeaderUUID(leaderSessionID);
                                ResourceManager.this.leaderSessionID = 
leaderSessionID;
                        }
                });
@@ -387,10 +399,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        @Override
                        public void run() {
                                log.info("ResourceManager {} was revoked 
leadership.", getAddress());
-                               jobMasterGateways.clear();
-                               taskExecutors.clear();
-                               slotManager.clearState();
-                               leaderSessionID = new UUID(0, 0);
+                               clearState();
                        }
                });
        }
@@ -577,6 +586,11 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        private class DefaultResourceManagerServices implements 
ResourceManagerServices {
 
                @Override
+               public UUID getLeaderID() {
+                       return ResourceManager.this.leaderSessionID;
+               }
+
+               @Override
                public void allocateResource(ResourceProfile resourceProfile) {
                        ResourceManager.this.startNewWorker(resourceProfile);
                }
@@ -592,33 +606,95 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
        }
 
-       private static class JobMasterLeaderListener implements 
LeaderRetrievalListener {
+       /**
+        * Leader instantiated for each connected JobMaster
+        */
+       private class JobIdLeaderListener implements LeaderRetrievalListener {
 
                private final JobID jobID;
-               private UUID leaderID;
+               private final LeaderRetrievalService retrievalService;
 
-               private JobMasterLeaderListener(JobID jobID) {
+               private final FlinkCompletableFuture<UUID> 
initialLeaderIdFuture;
+
+               private volatile UUID leaderID;
+
+               private JobIdLeaderListener(
+                               JobID jobID,
+                               LeaderRetrievalService retrievalService) throws 
Exception {
                        this.jobID = jobID;
+                       this.retrievalService = retrievalService;
+                       this.initialLeaderIdFuture = new 
FlinkCompletableFuture<>();
+                       this.retrievalService.start(this);
+               }
+
+               public Future<UUID> getLeaderID() {
+                       if (!initialLeaderIdFuture.isDone()) {
+                               return initialLeaderIdFuture;
+                       } else {
+                               return 
FlinkCompletableFuture.completed(leaderID);
+                       }
                }
 
                public JobID getJobID() {
                        return jobID;
                }
 
-               public UUID getLeaderID() {
-                       return leaderID;
+
+               public void stopService() throws Exception {
+                       retrievalService.stop();
                }
 
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
                        this.leaderID = leaderSessionID;
+
+                       if (!initialLeaderIdFuture.isDone()) {
+                               initialLeaderIdFuture.complete(leaderSessionID);
+                       }
+
+                       ResourceManager.this.runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       JobMasterRegistration 
jobMasterRegistration = ResourceManager.this.jobMasters.get(jobID);
+                                       if (jobMasterRegistration == null || 
!jobMasterRegistration.getLeaderID().equals(leaderSessionID)) {
+                                               // registration is not valid 
anymore, remove registration
+                                               
ResourceManager.this.jobMasters.remove(jobID);
+                                               // leader listener is not 
necessary anymore
+                                               JobIdLeaderListener listener = 
ResourceManager.this.leaderListeners.remove(jobID);
+                                               if (listener != null) {
+                                                       try {
+                                                               
listener.stopService();
+                                                       } catch (Exception e) {
+                                                               
ResourceManager.this.handleError(e);
+                                                       }
+                                               }
+                                       }
+                               }
+                       });
                }
 
                @Override
                public void handleError(final Exception exception) {
-                       // TODO
+                       ResourceManager.this.handleError(exception);
                }
        }
 
+       private void clearState() {
+               jobMasters.clear();
+               taskExecutors.clear();
+               slotManager.clearState();
+               Iterator<JobIdLeaderListener> leaderListenerIterator =
+                       leaderListeners.values().iterator();
+               while (leaderListenerIterator.hasNext()) {
+                       JobIdLeaderListener listener = 
leaderListenerIterator.next();
+                       try {
+                               listener.stopService();
+                       } catch (Exception e) {
+                               handleError(e);
+                       }
+                       leaderListenerIterator.remove();
+               }
+               leaderSessionID = new UUID(0, 0);
+       }
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 3c81227..07e9e43 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -61,14 +61,14 @@ public interface ResourceManagerGateway extends RpcGateway {
        /**
         * Requests a slot from the resource manager.
         *
-        * @param jobMasterLeaderID leader id of the JobMaster
         * @param resourceManagerLeaderID leader if of the ResourceMaster
+        * @param jobMasterLeaderID leader if of the JobMaster
         * @param slotRequest The slot to request
         * @return The confirmation that the slot gets allocated
         */
        Future<RMSlotRequestReply> requestSlot(
-               UUID jobMasterLeaderID,
                UUID resourceManagerLeaderID,
+               UUID jobMasterLeaderID,
                SlotRequest slotRequest,
                @RpcTimeout Time timeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
index b997a3a..16d0a7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerServices.java
@@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 /**
@@ -27,6 +28,11 @@ import java.util.concurrent.Executor;
 public interface ResourceManagerServices {
 
        /**
+        * Gets the current leader id assigned at the ResourceManager.
+        */
+       UUID getLeaderID();
+
+       /**
         * Allocates a resource according to the resource profile.
         */
        void allocateResource(ResourceProfile resourceProfile);

http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
new file mode 100644
index 0000000..f417935
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/registration/JobMasterRegistration.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.registration;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+
+import java.util.UUID;
+
+/**
+ * This class is responsible for grouping the JobMasterGateway and the 
JobMaster's
+ * leader id
+ */
+public class JobMasterRegistration {
+
+       private static final long serialVersionUID = -2062957799469434614L;
+
+       private final JobID jobID;
+
+       private final UUID leaderID;
+
+       private final JobMasterGateway jobMasterGateway;
+
+       public JobMasterRegistration(
+                       JobID jobID,
+                       UUID leaderID,
+                       JobMasterGateway jobMasterGateway) {
+               this.jobID = jobID;
+               this.leaderID = leaderID;
+               this.jobMasterGateway = jobMasterGateway;
+       }
+
+       public JobID getJobID() {
+               return jobID;
+       }
+
+
+       public UUID getLeaderID() {
+               return leaderID;
+       }
+
+       public JobMasterGateway getJobMasterGateway() {
+               return jobMasterGateway;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 7eb2d78..e312ea2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -41,7 +41,6 @@ import org.slf4j.LoggerFactory;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import java.util.UUID;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -85,9 +84,6 @@ public abstract class SlotManager {
 
        private final Time timeout;
 
-       /** The current leader id set by the ResourceManager */
-       private UUID leaderID;
-
        public SlotManager(ResourceManagerServices rmServices) {
                this.rmServices = checkNotNull(rmServices);
                this.registeredSlots = new HashMap<>(16);
@@ -96,7 +92,6 @@ public abstract class SlotManager {
                this.allocationMap = new AllocationMap();
                this.taskManagers = new HashMap<>();
                this.timeout = Time.seconds(10);
-               this.leaderID = new UUID(0, 0);
        }
 
        // 
------------------------------------------------------------------------
@@ -303,7 +298,7 @@ public abstract class SlotManager {
                final TaskExecutorRegistration registration = 
freeSlot.getTaskExecutorRegistration();
                final Future<TMSlotRequestReply> slotRequestReplyFuture =
                        registration.getTaskExecutorGateway()
-                               .requestSlot(freeSlot.getSlotId(), 
allocationID, leaderID, timeout);
+                               .requestSlot(freeSlot.getSlotId(), 
allocationID, rmServices.getLeaderID(), timeout);
 
                slotRequestReplyFuture.handleAsync(new 
BiFunction<TMSlotRequestReply, Throwable, Void>() {
                        @Override
@@ -488,15 +483,6 @@ public abstract class SlotManager {
                pendingSlotRequests.clear();
                freeSlots.clear();
                allocationMap.clear();
-               leaderID = new UUID(0, 0);
-       }
-
-       // 
------------------------------------------------------------------------
-       //  High availability (called by the ResourceManager)
-       // 
------------------------------------------------------------------------
-
-       public void setLeaderUUID(UUID leaderSessionID) {
-               this.leaderID = leaderSessionID;
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
index 0b2c42b..67b208d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
@@ -26,6 +26,7 @@ import org.mockito.Mockito;
 
 import java.util.Iterator;
 import java.util.Map;
+import java.util.UUID;
 import java.util.concurrent.Executor;
 
 public class TestingSlotManager extends SlotManager {
@@ -60,6 +61,13 @@ public class TestingSlotManager extends SlotManager {
 
        private static class TestingResourceManagerServices implements 
ResourceManagerServices {
 
+               private final UUID leaderID = UUID.randomUUID();
+
+               @Override
+               public UUID getLeaderID() {
+                       return leaderID;
+               }
+
                @Override
                public void allocateResource(ResourceProfile resourceProfile) {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cef31912/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 0d2b40d..558d3c2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -498,13 +498,21 @@ public class SlotManagerTest {
 
                private static class TestingRmServices implements 
ResourceManagerServices {
 
-                       private List<ResourceProfile> allocatedContainers;
+                       private final UUID leaderID;
+
+                       private final List<ResourceProfile> allocatedContainers;
 
                        public TestingRmServices() {
+                               this.leaderID = UUID.randomUUID();
                                this.allocatedContainers = new LinkedList<>();
                        }
 
                        @Override
+                       public UUID getLeaderID() {
+                               return leaderID;
+                       }
+
+                       @Override
                        public void allocateResource(ResourceProfile 
resourceProfile) {
                                allocatedContainers.add(resourceProfile);
                        }

Reply via email to