[FLINK-7504] Fence the ResourceManager

Properly fences the ResourceManager by letting it extend the FencedRpcEndpoint.
Moreover, this PR introduces a ResourceManagerId which replaces the UUID as
leader id/fencing token. This will give us more type safety when defining rpc
interfaces.

This closes #4582.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e70de0eb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e70de0eb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e70de0eb

Branch: refs/heads/master
Commit: e70de0eb8f2f2a89cf74aa4bc6d1501e07e22d43
Parents: 84c2a92
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Aug 24 15:11:00 2017 +0200
Committer: Till <till.rohrm...@gmail.com>
Committed: Mon Sep 4 13:44:01 2017 +0200

----------------------------------------------------------------------
 .../MesosResourceManagerTest.java               |   7 +-
 .../apache/flink/runtime/instance/SlotPool.java |  10 +-
 .../flink/runtime/instance/SlotPoolGateway.java |   4 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  49 +--
 .../runtime/jobmaster/JobMasterGateway.java     |   5 +-
 .../jobmaster/JobMasterRegistrationSuccess.java |  15 +-
 .../flink/runtime/minicluster/MiniCluster.java  |   5 +-
 .../registration/RetryingRegistration.java      |  12 +-
 .../resourcemanager/ResourceManager.java        | 314 +++++++------------
 .../resourcemanager/ResourceManagerGateway.java |  38 +--
 .../resourcemanager/ResourceManagerId.java      |  58 ++++
 .../slotmanager/SlotManager.java                |  22 +-
 .../runtime/taskexecutor/JobLeaderService.java  |   7 +-
 .../runtime/taskexecutor/TaskExecutor.java      |  35 +--
 .../taskexecutor/TaskExecutorGateway.java       |   8 +-
 ...TaskExecutorToResourceManagerConnection.java |  20 +-
 .../clusterframework/ResourceManagerTest.java   |   7 +-
 .../flink/runtime/instance/SlotPoolTest.java    |  14 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  |   9 +-
 .../runtime/metrics/MetricRegistryTest.java     |   6 +-
 .../resourcemanager/ResourceManagerHATest.java  |  37 ++-
 .../ResourceManagerJobMasterTest.java           |  68 ++--
 .../ResourceManagerTaskExecutorTest.java        |  40 ++-
 .../slotmanager/SlotManagerTest.java            | 115 ++++---
 .../slotmanager/SlotProtocolTest.java           |  14 +-
 .../taskexecutor/TaskExecutorITCase.java        |   3 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  |  48 ++-
 27 files changed, 462 insertions(+), 508 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
index cf0c913..02b043e 100644
--- 
a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
+++ 
b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
@@ -54,6 +54,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -326,7 +327,7 @@ public class MesosResourceManagerTest extends TestLogger {
                                                
slotManagerStarted.complete(true);
                                                return null;
                                        }
-                               }).when(slotManager).start(any(UUID.class), 
any(Executor.class), any(ResourceManagerActions.class));
+                               
}).when(slotManager).start(any(ResourceManagerId.class), any(Executor.class), 
any(ResourceManagerActions.class));
 
                                
when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true);
                        }
@@ -441,7 +442,7 @@ public class MesosResourceManagerTest extends TestLogger {
                 */
                public void registerJobMaster(MockJobMaster jobMaster) throws 
Exception  {
                        CompletableFuture<RegistrationResponse> registration = 
resourceManager.registerJobManager(
-                               rmServices.rmLeaderSessionId, 
jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, 
jobMaster.jobID, timeout);
+                               jobMaster.leaderSessionID, 
jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout);
                        assertTrue(registration.get() instanceof 
JobMasterRegistrationSuccess);
                }
 
@@ -617,7 +618,7 @@ public class MesosResourceManagerTest extends TestLogger {
 
                        // send registration message
                        CompletableFuture<RegistrationResponse> 
successfulFuture =
-                               
resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, 
task1Executor.address, task1Executor.resourceID, slotReport, timeout);
+                               
resourceManager.registerTaskExecutor(task1Executor.address, 
task1Executor.resourceID, slotReport, timeout);
                        RegistrationResponse response = 
successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
                        assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index bf3de25..326e3a2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -121,9 +121,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
        /** the leader id of job manager */
        private UUID jobManagerLeaderId;
 
-       /** The leader id of resource manager */
-       private UUID resourceManagerLeaderId;
-
        /** The gateway to communicate with resource manager */
        private ResourceManagerGateway resourceManagerGateway;
 
@@ -199,7 +196,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
 
                // do not accept any requests
                jobManagerLeaderId = null;
-               resourceManagerLeaderId = null;
                resourceManagerGateway = null;
 
                // Clear (but not release!) the available slots. The 
TaskManagers should re-register them
@@ -240,8 +236,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void connectToResourceManager(UUID resourceManagerLeaderId, 
ResourceManagerGateway resourceManagerGateway) {
-               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+       public void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway) {
                this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
 
                // work on all slots waiting for this connection
@@ -255,7 +250,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
 
        @Override
        public void disconnectResourceManager() {
-               this.resourceManagerLeaderId = null;
                this.resourceManagerGateway = null;
        }
 
@@ -319,7 +313,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway {
                pendingRequests.put(allocationID, new 
PendingRequest(allocationID, future, resources));
 
                CompletableFuture<Acknowledge> rmResponse = 
resourceManagerGateway.requestSlot(
-                       jobManagerLeaderId, resourceManagerLeaderId,
+                       jobManagerLeaderId,
                        new SlotRequest(jobId, allocationID, resources, 
jobManagerAddress),
                        resourceManagerRequestsTimeout);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
index 32a9af5..06c4b12 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import java.util.Collection;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -55,10 +54,9 @@ public interface SlotPoolGateway extends RpcGateway {
         * Connects the SlotPool to the given ResourceManager. After this 
method is called, the
         * SlotPool will be able to request resources from the given 
ResourceManager.
         * 
-        * @param resourceManagerLeaderId The leader session ID of the resource 
manager.
         * @param resourceManagerGateway  The RPC gateway for the resource 
manager.
         */
-       void connectToResourceManager(UUID resourceManagerLeaderId, 
ResourceManagerGateway resourceManagerGateway);
+       void connectToResourceManager(ResourceManagerGateway 
resourceManagerGateway);
 
        /**
         * Disconnects the slot pool from its current Resource Manager. After 
this call, the pool will not

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 55bc8f8..a05242a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -81,6 +81,7 @@ import 
org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -756,17 +757,17 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        @Override
        public void disconnectResourceManager(
                        final UUID jobManagerLeaderId,
-                       final UUID resourceManagerLeaderId,
+                       final ResourceManagerId resourceManagerId,
                        final Exception cause) {
 
                try {
                        validateLeaderSessionId(jobManagerLeaderId);
                } catch (LeaderIdMismatchException e) {
-                       log.warn("Cannot disconnect resource manager " + 
resourceManagerLeaderId + '.', e);
+                       log.warn("Cannot disconnect resource manager " + 
resourceManagerId + '.', e);
                }
 
                if (resourceManagerConnection != null
-                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
+                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) {
                        closeResourceManagerConnection(cause);
                }
        }
@@ -944,11 +945,11 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                }
        }
 
-       private void notifyOfNewResourceManagerLeader(final String 
resourceManagerAddress, final UUID resourceManagerLeaderId) {
+       private void notifyOfNewResourceManagerLeader(final String 
resourceManagerAddress, final ResourceManagerId resourceManagerId) {
                if (resourceManagerConnection != null) {
                        if (resourceManagerAddress != null) {
-                               if 
(resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress())
-                                       && 
resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) {
+                               if (Objects.equals(resourceManagerAddress, 
resourceManagerConnection.getTargetAddress())
+                                       && Objects.equals(resourceManagerId, 
resourceManagerConnection.getTargetLeaderId())) {
                                        // both address and leader id are not 
changed, we can keep the old connection
                                        return;
                                }
@@ -974,7 +975,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                                getAddress(),
                                leaderSessionID,
                                resourceManagerAddress,
-                               resourceManagerLeaderId,
+                               resourceManagerId,
                                executor);
 
                        resourceManagerConnection.start();
@@ -982,17 +983,17 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        }
 
        private void establishResourceManagerConnection(final 
JobMasterRegistrationSuccess success) {
-               final UUID resourceManagerLeaderId = 
success.getResourceManagerLeaderId();
+               final ResourceManagerId resourceManagerId = 
success.getResourceManagerId();
        
                // verify the response with current connection
                if (resourceManagerConnection != null
-                               && 
resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) {
+                               && 
Objects.equals(resourceManagerConnection.getTargetLeaderId(), 
resourceManagerId)) {
 
-                       log.info("JobManager successfully registered at 
ResourceManager, leader id: {}.", resourceManagerLeaderId);
+                       log.info("JobManager successfully registered at 
ResourceManager, leader id: {}.", resourceManagerId);
 
                        final ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
 
-                       
slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, 
resourceManagerGateway);
+                       
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
 
                        
resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(),
 new HeartbeatTarget<Void>() {
                                @Override
@@ -1005,6 +1006,9 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                                        // request heartbeat will never be 
called on the job manager side
                                }
                        });
+               } else {
+                       log.debug("Ignoring resource manager connection to {} 
because its a duplicate or outdated.", resourceManagerId);
+
                }
        }
 
@@ -1038,12 +1042,10 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
 
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       
notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
-                               }
-                       });
+                       runAsync(
+                               () -> notifyOfNewResourceManagerLeader(
+                                       leaderAddress,
+                                       leaderSessionID != null ? new 
ResourceManagerId(leaderSessionID) : null));
                }
 
                @Override
@@ -1055,7 +1057,7 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
        
//----------------------------------------------------------------------------------------------
 
        private class ResourceManagerConnection
-                       extends RegisteredRpcConnection<UUID, 
ResourceManagerGateway, JobMasterRegistrationSuccess>
+                       extends RegisteredRpcConnection<ResourceManagerId, 
ResourceManagerGateway, JobMasterRegistrationSuccess>
        {
                private final JobID jobID;
 
@@ -1074,10 +1076,10 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                                final String jobManagerRpcAddress,
                                final UUID jobManagerLeaderID,
                                final String resourceManagerAddress,
-                               final UUID resourceManagerLeaderID,
+                               final ResourceManagerId resourceManagerId,
                                final Executor executor)
                {
-                       super(log, resourceManagerAddress, 
resourceManagerLeaderID, executor);
+                       super(log, resourceManagerAddress, resourceManagerId, 
executor);
                        this.jobID = checkNotNull(jobID);
                        this.jobManagerResourceID = 
checkNotNull(jobManagerResourceID);
                        this.jobManagerRpcAddress = 
checkNotNull(jobManagerRpcAddress);
@@ -1085,19 +1087,18 @@ public class JobMaster extends RpcEndpoint implements 
JobMasterGateway {
                }
 
                @Override
-               protected RetryingRegistration<UUID, ResourceManagerGateway, 
JobMasterRegistrationSuccess> generateRegistration() {
-                       return new RetryingRegistration<UUID, 
ResourceManagerGateway, JobMasterRegistrationSuccess>(
+               protected RetryingRegistration<ResourceManagerId, 
ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() {
+                       return new RetryingRegistration<ResourceManagerId, 
ResourceManagerGateway, JobMasterRegistrationSuccess>(
                                        log, getRpcService(), 
"ResourceManager", ResourceManagerGateway.class,
                                        getTargetAddress(), getTargetLeaderId())
                        {
                                @Override
                                protected 
CompletableFuture<RegistrationResponse> invokeRegistration(
-                                               ResourceManagerGateway gateway, 
UUID leaderId, long timeoutMillis) throws Exception
+                                               ResourceManagerGateway gateway, 
ResourceManagerId fencingToken, long timeoutMillis) throws Exception
                                {
                                        Time timeout = 
Time.milliseconds(timeoutMillis);
 
                                        return gateway.registerJobManager(
-                                               leaderId,
                                                jobManagerLeaderID,
                                                jobManagerResourceID,
                                                jobManagerRpcAddress,

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index bfa2930..b39f419 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
 import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.internal.InternalKvState;
 import org.apache.flink.runtime.state.KeyGroupRange;
@@ -123,12 +124,12 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * Disconnects the resource manager from the job manager because of the 
given cause.
         *
         * @param jobManagerLeaderId identifying the job manager leader id
-        * @param resourceManagerLeaderId identifying the resource manager 
leader id
+        * @param resourceManagerId identifying the resource manager leader id
         * @param cause of the disconnect
         */
        void disconnectResourceManager(
                final UUID jobManagerLeaderId,
-               final UUID resourceManagerLeaderId,
+               final ResourceManagerId resourceManagerId,
                final Exception cause);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
index a7a6224..94ecfd2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java
@@ -20,8 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-
-import java.util.UUID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -34,16 +33,16 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
 
        private final long heartbeatInterval;
 
-       private final UUID resourceManagerLeaderId;
+       private final ResourceManagerId resourceManagerId;
 
        private final ResourceID resourceManagerResourceId;
 
        public JobMasterRegistrationSuccess(
                        final long heartbeatInterval,
-                       final UUID resourceManagerLeaderId,
+                       final ResourceManagerId resourceManagerId,
                        final ResourceID resourceManagerResourceId) {
                this.heartbeatInterval = heartbeatInterval;
-               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+               this.resourceManagerId = checkNotNull(resourceManagerId);
                this.resourceManagerResourceId = 
checkNotNull(resourceManagerResourceId);
        }
 
@@ -56,8 +55,8 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
                return heartbeatInterval;
        }
 
-       public UUID getResourceManagerLeaderId() {
-               return resourceManagerLeaderId;
+       public ResourceManagerId getResourceManagerId() {
+               return resourceManagerId;
        }
 
        public ResourceID getResourceManagerResourceId() {
@@ -68,7 +67,7 @@ public class JobMasterRegistrationSuccess extends 
RegistrationResponse.Success {
        public String toString() {
                return "JobMasterRegistrationSuccess{" +
                        "heartbeatInterval=" + heartbeatInterval +
-                       ", resourceManagerLeaderId=" + resourceManagerLeaderId +
+                       ", resourceManagerLeaderId=" + resourceManagerId +
                        ", resourceManagerResourceId=" + 
resourceManagerResourceId +
                        '}';
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 9a4a76a..95f430c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -419,14 +420,14 @@ public class MiniCluster {
                        final LeaderAddressAndId addressAndId = 
addressAndIdFuture.get();
 
                        final ResourceManagerGateway resourceManager = 
-                                       
commonRpcService.connect(addressAndId.leaderAddress(), 
ResourceManagerGateway.class).get();
+                                       
commonRpcService.connect(addressAndId.leaderAddress(), new 
ResourceManagerId(addressAndId.leaderId()), ResourceManagerGateway.class).get();
 
                        final int numTaskManagersToWaitFor = 
taskManagers.length;
 
                        // poll and wait until enough TaskManagers are available
                        while (true) {
                                int numTaskManagersAvailable = 
-                                               
resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get();
+                                               
resourceManager.getNumberOfRegisteredTaskManagers().get();
 
                                if (numTaskManagersAvailable >= 
numTaskManagersToWaitFor) {
                                        break;

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index be30c68..ce4a798 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
 
@@ -176,7 +177,16 @@ public abstract class RetryingRegistration<F extends 
Serializable, G extends Rpc
        public void startRegistration() {
                try {
                        // trigger resolution of the resource manager address 
to a callable gateway
-                       CompletableFuture<G> resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
+                       final CompletableFuture<G> resourceManagerFuture;
+
+                       if 
(FencedRpcGateway.class.isAssignableFrom(targetType)) {
+                               resourceManagerFuture = (CompletableFuture<G>) 
rpcService.connect(
+                                       targetAddress,
+                                       fencingToken,
+                                       
targetType.asSubclass(FencedRpcGateway.class));
+                       } else {
+                               resourceManagerFuture = 
rpcService.connect(targetAddress, targetType);
+                       }
 
                        // upon success, start the registration attempts
                        CompletableFuture<Void> resourceManagerAcceptFuture = 
resourceManagerFuture.thenAcceptAsync(

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 6b2c898..659b3d4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -33,7 +33,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
@@ -50,11 +49,10 @@ import 
org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActio
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import 
org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
-import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
-import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.util.ExceptionUtils;
@@ -77,12 +75,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  *
  * <p>It offers the following methods as part of its rpc interface to interact 
with him remotely:
  * <ul>
- *     <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID, 
Time)} registers a {@link JobMaster} at the resource manager</li>
- *     <li>{@link #requestSlot(UUID, UUID, SlotRequest, Time)} requests a slot 
from the resource manager</li>
+ *     <li>{@link #registerJobManager(UUID, ResourceID, String, JobID, Time)} 
registers a {@link JobMaster} at the resource manager</li>
+ *     <li>{@link #requestSlot(UUID, SlotRequest, Time)} requests a slot from 
the resource manager</li>
  * </ul>
  */
 public abstract class ResourceManager<WorkerType extends Serializable>
-               extends RpcEndpoint
+               extends FencedRpcEndpoint<ResourceManagerId>
                implements ResourceManagerGateway, LeaderContender {
 
        public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
@@ -126,9 +124,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /** The service to elect a ResourceManager leader. */
        private LeaderElectionService leaderElectionService;
 
-       /** ResourceManager's leader session id which is updated on leader 
election. */
-       private volatile UUID leaderSessionId;
-
        /** All registered listeners for status updates of the ResourceManager. 
*/
        private ConcurrentMap<String, InfoMessageListenerRpcGateway> 
infoMessageListeners;
 
@@ -144,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        JobLeaderIdService jobLeaderIdService,
                        FatalErrorHandler fatalErrorHandler) {
 
-               super(rpcService, resourceManagerEndpointId);
+               super(rpcService, resourceManagerEndpointId, 
ResourceManagerId.generate());
 
                this.resourceId = checkNotNull(resourceId);
                this.resourceManagerConfiguration = 
checkNotNull(resourceManagerConfiguration);
@@ -169,7 +164,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                this.jobManagerRegistrations = new HashMap<>(4);
                this.jmResourceIdRegistrations = new HashMap<>(4);
                this.taskExecutors = new HashMap<>(8);
-               this.leaderSessionId = null;
                infoMessageListeners = new ConcurrentHashMap<>(8);
        }
 
@@ -246,147 +240,109 @@ public abstract class ResourceManager<WorkerType 
extends Serializable>
 
        @Override
        public CompletableFuture<RegistrationResponse> registerJobManager(
-                       final UUID resourceManagerLeaderId,
                        final UUID jobManagerLeaderId,
                        final ResourceID jobManagerResourceId,
                        final String jobManagerAddress,
                        final JobID jobId,
                        final Time timeout) {
 
-               checkNotNull(resourceManagerLeaderId);
                checkNotNull(jobManagerLeaderId);
                checkNotNull(jobManagerResourceId);
                checkNotNull(jobManagerAddress);
                checkNotNull(jobId);
 
-               if (isValid(resourceManagerLeaderId)) {
-                       if (!jobLeaderIdService.containsJob(jobId)) {
-                               try {
-                                       jobLeaderIdService.addJob(jobId);
-                               } catch (Exception e) {
-                                       ResourceManagerException exception = 
new ResourceManagerException("Could not add the job " +
-                                               jobId + " to the job id leader 
service.", e);
+               if (!jobLeaderIdService.containsJob(jobId)) {
+                       try {
+                               jobLeaderIdService.addJob(jobId);
+                       } catch (Exception e) {
+                               ResourceManagerException exception = new 
ResourceManagerException("Could not add the job " +
+                                       jobId + " to the job id leader 
service.", e);
 
                                        onFatalError(exception);
 
-                                       log.error("Could not add job {} to job 
leader id service.", jobId, e);
-                                       return 
FutureUtils.completedExceptionally(exception);
-                               }
+                               log.error("Could not add job {} to job leader 
id service.", jobId, e);
+                               return 
FutureUtils.completedExceptionally(exception);
                        }
+               }
 
-                       log.info("Registering job manager {}@{} for job {}.", 
jobManagerLeaderId, jobManagerAddress, jobId);
+               log.info("Registering job manager {}@{} for job {}.", 
jobManagerLeaderId, jobManagerAddress, jobId);
 
-                       CompletableFuture<UUID> jobLeaderIdFuture;
+               CompletableFuture<UUID> jobLeaderIdFuture;
 
-                       try {
-                               jobLeaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
-                       } catch (Exception e) {
-                               // we cannot check the job leader id so let's 
fail
-                               // TODO: Maybe it's also ok to skip this check 
in case that we cannot check the leader id
-                               ResourceManagerException exception = new 
ResourceManagerException("Cannot obtain the " +
-                                       "job leader id future to verify the 
correct job leader.", e);
+               try {
+                       jobLeaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+               } catch (Exception e) {
+                       // we cannot check the job leader id so let's fail
+                       // TODO: Maybe it's also ok to skip this check in case 
that we cannot check the leader id
+                       ResourceManagerException exception = new 
ResourceManagerException("Cannot obtain the " +
+                               "job leader id future to verify the correct job 
leader.", e);
 
                                onFatalError(exception);
 
-                               log.debug("Could not obtain the job leader id 
future to verify the correct job leader.");
-                               return 
FutureUtils.completedExceptionally(exception);
-                       }
-
-                       CompletableFuture<JobMasterGateway> 
jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, 
JobMasterGateway.class);
-
-                       CompletableFuture<RegistrationResponse> 
registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
-                               jobLeaderIdFuture,
-                               (JobMasterGateway jobMasterGateway, UUID 
jobLeaderId) -> {
-                                       if (isValid(resourceManagerLeaderId)) {
-                                               if (Objects.equals(jobLeaderId, 
jobManagerLeaderId)) {
-                                                       return 
registerJobMasterInternal(
-                                                               
jobMasterGateway,
-                                                               jobLeaderId,
-                                                               jobId,
-                                                               
jobManagerAddress,
-                                                               
jobManagerResourceId);
-                                               } else {
-                                                       log.debug("The job 
manager leader id {} did not match the job " +
-                                                               "leader id 
{}.", jobManagerLeaderId, jobLeaderId);
-                                                       return new 
RegistrationResponse.Decline("Job manager leader id did not match.");
-                                               }
-                                       } else {
-                                               log.debug("The resource manager 
leader id changed {}. Discarding job " +
-                                                       "manager registration 
from {}.", getLeaderSessionId(), jobManagerAddress);
-                                               return new 
RegistrationResponse.Decline("Resource manager leader id changed.");
-                                       }
-                               },
-                               getMainThreadExecutor());
+                       log.debug("Could not obtain the job leader id future to 
verify the correct job leader.");
+                       return FutureUtils.completedExceptionally(exception);
+               }
 
-                       // handle exceptions which might have occurred in one 
of the futures inputs of combine
-                       return registrationResponseFuture.handleAsync(
-                               (RegistrationResponse registrationResponse, 
Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               if (log.isDebugEnabled()) {
-                                                       log.debug("Registration 
of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress, 
throwable);
-                                               } else {
-                                                       log.info("Registration 
of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress);
-                                               }
+               CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = 
getRpcService().connect(jobManagerAddress, JobMasterGateway.class);
 
-                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
+               CompletableFuture<RegistrationResponse> 
registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync(
+                       jobLeaderIdFuture,
+                       (JobMasterGateway jobMasterGateway, UUID jobLeaderId) 
-> {
+                               if (Objects.equals(jobLeaderId, 
jobManagerLeaderId)) {
+                                       return registerJobMasterInternal(
+                                               jobMasterGateway,
+                                               jobLeaderId,
+                                               jobId,
+                                               jobManagerAddress,
+                                               jobManagerResourceId);
+                               } else {
+                                       log.debug("The job manager leader id {} 
did not match the job " +
+                                               "leader id {}.", 
jobManagerLeaderId, jobLeaderId);
+                                       return new 
RegistrationResponse.Decline("Job manager leader id did not match.");
+                               }
+                       },
+                       getMainThreadExecutor());
+
+               // handle exceptions which might have occurred in one of the 
futures inputs of combine
+               return registrationResponseFuture.handleAsync(
+                       (RegistrationResponse registrationResponse, Throwable 
throwable) -> {
+                               if (throwable != null) {
+                                       if (log.isDebugEnabled()) {
+                                               log.debug("Registration of job 
manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress, throwable);
                                        } else {
-                                               return registrationResponse;
+                                               log.info("Registration of job 
manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress);
                                        }
-                               },
-                               getRpcService().getExecutor());
-               } else {
-                       log.debug("Discard register job manager message from 
{}, because the leader id " +
-                               "{} did not match the expected leader id {}.", 
jobManagerAddress,
-                               resourceManagerLeaderId, leaderSessionId);
 
-                       return CompletableFuture.completedFuture(
-                               new RegistrationResponse.Decline("Resource 
manager leader id did not match."));
-               }
+                                       return new 
RegistrationResponse.Decline(throwable.getMessage());
+                               } else {
+                                       return registrationResponse;
+                               }
+                       },
+                       getRpcService().getExecutor());
        }
 
-       /**
-        * Register a {@link TaskExecutor} at the resource manager.
-        *
-        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
-        * @param taskExecutorAddress      The address of the TaskExecutor that 
registers
-        * @param taskExecutorResourceId  The resource ID of the TaskExecutor 
that registers
-        *
-        * @return The response by the ResourceManager.
-        */
        @Override
        public CompletableFuture<RegistrationResponse> registerTaskExecutor(
-                       final UUID resourceManagerLeaderId,
                        final String taskExecutorAddress,
                        final ResourceID taskExecutorResourceId,
                        final SlotReport slotReport,
                        final Time timeout) {
 
-               if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) {
-                       CompletableFuture<TaskExecutorGateway> 
taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, 
TaskExecutorGateway.class);
+               CompletableFuture<TaskExecutorGateway> 
taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, 
TaskExecutorGateway.class);
 
-                       return taskExecutorGatewayFuture.handleAsync(
-                               (TaskExecutorGateway taskExecutorGateway, 
Throwable throwable) -> {
-                                       if (throwable != null) {
-                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
-                                       } else {
-                                               return 
registerTaskExecutorInternal(
-                                                       taskExecutorGateway,
-                                                       taskExecutorAddress,
-                                                       taskExecutorResourceId,
-                                                       slotReport);
-                                       }
-                               },
-                               getMainThreadExecutor());
-               } else {
-                       log.warn("Discard registration from TaskExecutor {} at 
({}) because the expected leader session ID {} did " +
-                                       "not equal the received leader session 
ID  {}",
-                               taskExecutorResourceId, taskExecutorAddress, 
leaderSessionId, resourceManagerLeaderId);
-
-                       return CompletableFuture.completedFuture(
-                               new RegistrationResponse.Decline("Discard 
registration because the leader id " +
-                                       resourceManagerLeaderId + " does not 
match the expected leader id " +
-                                       leaderSessionId + '.'));
-               }
+               return taskExecutorGatewayFuture.handleAsync(
+                       (TaskExecutorGateway taskExecutorGateway, Throwable 
throwable) -> {
+                               if (throwable != null) {
+                                       return new 
RegistrationResponse.Decline(throwable.getMessage());
+                               } else {
+                                       return registerTaskExecutorInternal(
+                                               taskExecutorGateway,
+                                               taskExecutorAddress,
+                                               taskExecutorResourceId,
+                                               slotReport);
+                               }
+                       },
+                       getMainThreadExecutor());
        }
 
        @Override
@@ -409,23 +365,12 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                closeJobManagerConnection(jobId, cause);
        }
 
-       /**
-        * Requests a slot from the resource manager.
-        *
-        * @param slotRequest Slot request
-        * @return Slot assignment
-        */
        @Override
        public CompletableFuture<Acknowledge> requestSlot(
                        UUID jobMasterLeaderID,
-                       UUID resourceManagerLeaderID,
                        SlotRequest slotRequest,
                        final Time timeout) {
 
-               if (!Objects.equals(resourceManagerLeaderID, leaderSessionId)) {
-                       return FutureUtils.completedExceptionally(new 
LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId));
-               }
-
                JobID jobId = slotRequest.getJobId();
                JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.get(jobId);
 
@@ -452,41 +397,27 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
        }
 
-       /**
-        * Notification from a TaskExecutor that a slot has become available.
-        *
-        * @param resourceManagerLeaderId TaskExecutor's resource manager 
leader id
-        * @param instanceID TaskExecutor's instance id
-        * @param slotId The slot id of the available slot
-        */
        @Override
        public void notifySlotAvailable(
-                       final UUID resourceManagerLeaderId,
                        final InstanceID instanceID,
                        final SlotID slotId,
                        final AllocationID allocationId) {
 
-               if (Objects.equals(resourceManagerLeaderId, leaderSessionId)) {
-                       final ResourceID resourceId = slotId.getResourceID();
-                       WorkerRegistration<WorkerType> registration = 
taskExecutors.get(resourceId);
+               final ResourceID resourceId = slotId.getResourceID();
+               WorkerRegistration<WorkerType> registration = 
taskExecutors.get(resourceId);
 
-                       if (registration != null) {
-                               InstanceID registrationId = 
registration.getInstanceID();
+               if (registration != null) {
+                       InstanceID registrationId = 
registration.getInstanceID();
 
-                               if (Objects.equals(registrationId, instanceID)) 
{
-                                       slotManager.freeSlot(slotId, 
allocationId);
-                               } else {
-                                       log.debug("Invalid registration id for 
slot available message. This indicates an" +
-                                               " outdated request.");
-                               }
+                       if (Objects.equals(registrationId, instanceID)) {
+                               slotManager.freeSlot(slotId, allocationId);
                        } else {
-                               log.debug("Could not find registration for 
resource id {}. Discarding the slot available" +
-                                       "message {}.", resourceId, slotId);
+                               log.debug("Invalid registration id for slot 
available message. This indicates an" +
+                                       " outdated request.");
                        }
                } else {
-                       log.debug("Discarding notify slot available message for 
slot {}, because the " +
-                               "leader id {} did not match the expected leader 
id {}.", slotId,
-                               resourceManagerLeaderId, leaderSessionId);
+                       log.debug("Could not find registration for resource id 
{}. Discarding the slot available" +
+                               "message {}.", resourceId, slotId);
                }
        }
 
@@ -545,27 +476,8 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        }
 
        @Override
-       public CompletableFuture<Integer> 
getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) {
-               if (Objects.equals(leaderSessionId, requestLeaderSessionId)) {
-                       return 
CompletableFuture.completedFuture(taskExecutors.size());
-               }
-               else {
-                       return FutureUtils.completedExceptionally(new 
LeaderIdMismatchException(leaderSessionId, requestLeaderSessionId));
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  Testing methods
-       // 
------------------------------------------------------------------------
-
-       /**
-        * Gets the leader session id of current resourceManager.
-        *
-        * @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
-        */
-       @VisibleForTesting
-       UUID getLeaderSessionId() {
-               return leaderSessionId;
+       public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
+               return CompletableFuture.completedFuture(taskExecutors.size());
        }
 
        // 
------------------------------------------------------------------------
@@ -635,7 +547,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
 
                return new JobMasterRegistrationSuccess(
                        
resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(),
-                       getLeaderSessionId(),
+                       getFencingToken(),
                        resourceId);
        }
 
@@ -706,8 +618,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                } catch (Exception e) {
                        onFatalError(new ResourceManagerException("Could not 
properly clear the job leader id service.", e));
                }
-
-               leaderSessionId = null;
        }
 
        /**
@@ -735,7 +645,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        jmResourceIdRegistrations.remove(jobManagerResourceId);
 
                        // tell the job manager about the disconnect
-                       
jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, 
getLeaderSessionId(), cause);
+                       
jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, 
getFencingToken(), cause);
                } else {
                        log.debug("There was no registered job manager for job 
{}.", jobId);
                }
@@ -765,17 +675,6 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                }
        }
 
-       /**
-        * Checks whether the given resource manager leader id is matching the 
current leader id and
-        * not null.
-        *
-        * @param resourceManagerLeaderId to check
-        * @return True if the given leader id matches the actual leader id and 
is not null; otherwise false
-        */
-       protected boolean isValid(UUID resourceManagerLeaderId) {
-               return Objects.equals(resourceManagerLeaderId, leaderSessionId);
-       }
-
        protected void removeJob(JobID jobId) {
                try {
                        jobLeaderIdService.removeJob(jobId);
@@ -848,29 +747,26 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         */
        @Override
        public void grantLeadership(final UUID newLeaderSessionID) {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("ResourceManager {} was granted 
leadership with leader session ID {}", getAddress(), newLeaderSessionID);
+               runAsyncWithoutFencing(
+                       () -> {
+                               final ResourceManagerId newResourceManagerId = 
new ResourceManagerId(newLeaderSessionID);
+
+                               log.info("ResourceManager {} was granted 
leadership with fencing token {}", getAddress(), newResourceManagerId);
 
                                // clear the state if we've been the leader 
before
-                               if (leaderSessionId != null) {
+                               if (getFencingToken() != null) {
                                        clearState();
                                }
 
-                               leaderSessionId = newLeaderSessionID;
+                               setFencingToken(newResourceManagerId);
 
-                               slotManager.start(leaderSessionId, 
getMainThreadExecutor(), new ResourceManagerActionsImpl());
+                               slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceManagerActionsImpl());
 
-                               getRpcService().execute(new Runnable() {
-                                       @Override
-                                       public void run() {
+                               getRpcService().execute(
+                                       () ->
                                                // confirming the leader 
session ID might be blocking,
-                                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
-                                       }
-                               });
-                       }
-               });
+                                               
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
+                       });
        }
 
        /**
@@ -878,16 +774,18 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
         */
        @Override
        public void revokeLeadership() {
-               runAsync(new Runnable() {
-                       @Override
-                       public void run() {
-                               log.info("ResourceManager {} was revoked 
leadership.", getAddress());
+               runAsyncWithoutFencing(
+                       () -> {
+                               final ResourceManagerId newResourceManagerId = 
ResourceManagerId.generate();
+
+                               log.info("ResourceManager {} was revoked 
leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId);
 
                                clearState();
 
+                               setFencingToken(newResourceManagerId);
+
                                slotManager.suspend();
-                       }
-               });
+                       });
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 1ba6893..ac81048 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -26,11 +26,12 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.FencedRpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -38,69 +39,61 @@ import java.util.concurrent.CompletableFuture;
 /**
  * The {@link ResourceManager}'s RPC gateway interface.
  */
-public interface ResourceManagerGateway extends RpcGateway {
+public interface ResourceManagerGateway extends 
FencedRpcGateway<ResourceManagerId> {
 
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
-        * @param resourceManagerLeaderId The fencing token for the 
ResourceManager leader
         * @param jobMasterLeaderId The fencing token for the JobMaster leader
-        * @param jobMasterResourceId   The resource ID of the JobMaster that 
registers
-        * @param jobMasterAddress        The address of the JobMaster that 
registers
-        * @param jobID                   The Job ID of the JobMaster that 
registers
-        * @param timeout                 Timeout for the future to complete
+        * @param jobMasterResourceId The resource ID of the JobMaster that 
registers
+        * @param jobMasterAddress The address of the JobMaster that registers
+        * @param jobId The Job ID of the JobMaster that registers
+        * @param timeout Timeout for the future to complete
         * @return Future registration response
         */
        CompletableFuture<RegistrationResponse> registerJobManager(
-               UUID resourceManagerLeaderId,
                UUID jobMasterLeaderId,
                ResourceID jobMasterResourceId,
                String jobMasterAddress,
-               JobID jobID,
+               JobID jobId,
                @RpcTimeout Time timeout);
 
        /**
         * Requests a slot from the resource manager.
         *
-        * @param resourceManagerLeaderID leader if of the ResourceMaster
         * @param jobMasterLeaderID leader if of the JobMaster
         * @param slotRequest The slot to request
         * @return The confirmation that the slot gets allocated
         */
        CompletableFuture<Acknowledge> requestSlot(
-               UUID resourceManagerLeaderID,
                UUID jobMasterLeaderID,
                SlotRequest slotRequest,
                @RpcTimeout Time timeout);
 
        /**
-        * Register a {@link 
org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager.
+        * Register a {@link TaskExecutor} at the resource manager.
         *
-        * @param resourceManagerLeaderId  The fencing token for the 
ResourceManager leader
-        * @param taskExecutorAddress     The address of the TaskExecutor that 
registers
-        * @param resourceID              The resource ID of the TaskExecutor 
that registers
-        * @param slotReport              The slot report containing free and 
allocated task slots
-        * @param timeout                 The timeout for the response.
+        * @param taskExecutorAddress The address of the TaskExecutor that 
registers
+        * @param resourceId The resource ID of the TaskExecutor that registers
+        * @param slotReport The slot report containing free and allocated task 
slots
+        * @param timeout The timeout for the response.
         *
         * @return The future to the response by the ResourceManager.
         */
        CompletableFuture<RegistrationResponse> registerTaskExecutor(
-               UUID resourceManagerLeaderId,
                String taskExecutorAddress,
-               ResourceID resourceID,
+               ResourceID resourceId,
                SlotReport slotReport,
                @RpcTimeout Time timeout);
 
        /**
         * Sent by the TaskExecutor to notify the ResourceManager that a slot 
has become available.
         *
-        * @param resourceManagerLeaderId The ResourceManager leader id
         * @param instanceId TaskExecutor's instance id
         * @param slotID The SlotID of the freed slot
         * @param oldAllocationId to which the slot has been allocated
         */
        void notifySlotAvailable(
-               UUID resourceManagerLeaderId,
                InstanceID instanceId,
                SlotID slotID,
                AllocationID oldAllocationId);
@@ -130,10 +123,9 @@ public interface ResourceManagerGateway extends RpcGateway 
{
        /**
         * Gets the currently registered number of TaskManagers.
         * 
-        * @param leaderSessionId The leader session ID with which to address 
the ResourceManager.
         * @return The future to the number of registered TaskManagers.
         */
-       CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(UUID 
leaderSessionId);
+       CompletableFuture<Integer> getNumberOfRegisteredTaskManagers();
 
        /**
         * Sends the heartbeat to resource manager from task manager

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
new file mode 100644
index 0000000..3594e88
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.util.AbstractID;
+
+import java.util.UUID;
+
+/**
+ * Fencing token for the {@link ResourceManager}.
+ */
+public class ResourceManagerId extends AbstractID {
+
+       private static final long serialVersionUID = -6042820142662137374L;
+
+       public ResourceManagerId(byte[] bytes) {
+               super(bytes);
+       }
+
+       public ResourceManagerId(long lowerPart, long upperPart) {
+               super(lowerPart, upperPart);
+       }
+
+       public ResourceManagerId(AbstractID id) {
+               super(id);
+       }
+
+       public ResourceManagerId() {
+       }
+
+       public ResourceManagerId(UUID uuid) {
+               this(uuid.getLeastSignificantBits(), 
uuid.getMostSignificantBits());
+       }
+
+       public UUID toUUID() {
+               return new UUID(getUpperPart(), getLowerPart());
+       }
+
+       public static ResourceManagerId generate() {
+               return new ResourceManagerId();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 5218286..d8eb47c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
 import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException;
+import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -45,7 +47,6 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.UUID;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -64,7 +65,7 @@ import java.util.concurrent.TimeoutException;
  * slots are currently not used) and pending slot requests time out triggering 
their release and
  * failure, respectively.
  */
-public class SlotManager implements AutoCloseable {
+public class SlotManager<T extends AbstractID> implements AutoCloseable {
        private static final Logger LOG = 
LoggerFactory.getLogger(SlotManager.class);
 
        /** Scheduled executor for timeouts */
@@ -94,8 +95,8 @@ public class SlotManager implements AutoCloseable {
        /** Map of pending/unfulfilled slot allocation requests */
        private final HashMap<AllocationID, PendingSlotRequest> 
pendingSlotRequests;
 
-       /** Leader id of the containing component */
-       private UUID leaderId;
+       /** ResourceManager's id */
+       private ResourceManagerId resourceManagerId;
 
        /** Executor for future callbacks which have to be "synchronized" */
        private Executor mainThreadExecutor;
@@ -126,7 +127,7 @@ public class SlotManager implements AutoCloseable {
                fulfilledSlotRequests = new HashMap<>(16);
                pendingSlotRequests = new HashMap<>(16);
 
-               leaderId = null;
+               resourceManagerId = null;
                resourceManagerActions = null;
                mainThreadExecutor = null;
                taskManagerTimeoutCheck = null;
@@ -142,13 +143,14 @@ public class SlotManager implements AutoCloseable {
        /**
         * Starts the slot manager with the given leader id and resource 
manager actions.
         *
-        * @param newLeaderId to use for communication with the task managers
+        * @param newResourceManagerId to use for communication with the task 
managers
+        * @param newMainThreadExecutor to use to run code in the 
ResourceManager's main thread
         * @param newResourceManagerActions to use for resource (de-)allocations
         */
-       public void start(UUID newLeaderId, Executor newMainThreadExecutor, 
ResourceManagerActions newResourceManagerActions) {
+       public void start(ResourceManagerId newResourceManagerId, Executor 
newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) {
                LOG.info("Starting the SlotManager.");
 
-               leaderId = Preconditions.checkNotNull(newLeaderId);
+               this.resourceManagerId = 
Preconditions.checkNotNull(newResourceManagerId);
                mainThreadExecutor = 
Preconditions.checkNotNull(newMainThreadExecutor);
                resourceManagerActions = 
Preconditions.checkNotNull(newResourceManagerActions);
 
@@ -204,7 +206,7 @@ public class SlotManager implements AutoCloseable {
                        unregisterTaskManager(registeredTaskManager);
                }
 
-               leaderId = null;
+               resourceManagerId = null;
                resourceManagerActions = null;
                started = false;
        }
@@ -643,7 +645,7 @@ public class SlotManager implements AutoCloseable {
                        pendingSlotRequest.getJobId(),
                        allocationId,
                        pendingSlotRequest.getTargetAddress(),
-                       leaderId,
+                       resourceManagerId,
                        taskManagerRequestTimeout);
 
                requestFuture.whenComplete(

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
index 6d1f22c..f564df4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -325,7 +326,7 @@ public class JobLeaderService {
                        @Override
                        protected void 
onRegistrationSuccess(JMTMRegistrationSuccess success) {
                                // filter out old registration attempts
-                               if 
(getTargetLeaderId().equals(currentLeaderId)) {
+                               if (Objects.equals(getTargetLeaderId(), 
currentLeaderId)) {
                                        log.info("Successful registration at 
job manager {} for job {}.", getTargetAddress(), jobId);
 
                                        
jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), 
getTargetLeaderId(), success);
@@ -337,8 +338,8 @@ public class JobLeaderService {
                        @Override
                        protected void onRegistrationFailure(Throwable failure) 
{
                                // filter out old registration attempts
-                               if 
(getTargetLeaderId().equals(currentLeaderId)) {
-                                       log.info("Failed to register at job 
manager {} for job {}.", getTargetAddress(), jobId);
+                               if (Objects.equals(getTargetLeaderId(), 
currentLeaderId)) {
+                                       log.info("Failed to register at job  
manager {} for job {}.", getTargetAddress(), jobId);
                                        jobLeaderListener.handleError(failure);
                                } else {
                                        log.debug("Obsolete JobManager 
registration failure from {} with leader session ID {}.", getTargetAddress(), 
getTargetLeaderId(), failure);

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index ef47ad4..3b1a1b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -59,6 +59,7 @@ import 
org.apache.flink.runtime.registration.RegistrationConnectionListener;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -585,29 +586,18 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        // Slot allocation RPCs
        // 
----------------------------------------------------------------------
 
-       /**
-        * Requests a slot from the TaskManager
-        *
-        * @param slotId identifying the requested slot
-        * @param jobId identifying the job for which the request is issued
-        * @param allocationId id for the request
-        * @param targetAddress of the job manager requesting the slot
-        * @param rmLeaderId current leader id of the ResourceManager
-        * @throws SlotAllocationException if the slot allocation fails
-        * @return answer to the slot request
-        */
        @Override
        public CompletableFuture<Acknowledge> requestSlot(
                final SlotID slotId,
                final JobID jobId,
                final AllocationID allocationId,
                final String targetAddress,
-               final UUID rmLeaderId,
+               final ResourceManagerId resourceManagerId,
                final Time timeout) {
                // TODO: Filter invalid requests from the resource manager by 
using the instance/registration Id
 
                log.info("Receive slot request {} for job {} from resource 
manager with leader id {}.",
-                       allocationId, jobId, rmLeaderId);
+                       allocationId, jobId, resourceManagerId);
 
                try {
                        if (resourceManagerConnection == null) {
@@ -616,8 +606,8 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                throw new SlotAllocationException(message);
                        }
 
-                       if 
(!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
-                               final String message = "The leader id " + 
rmLeaderId +
+                       if 
(!Objects.equals(resourceManagerConnection.getTargetLeaderId(), 
resourceManagerId)) {
+                               final String message = "The leader id " + 
resourceManagerId +
                                        " does not match with the leader id of 
the connected resource manager " +
                                        
resourceManagerConnection.getTargetLeaderId() + '.';
 
@@ -692,7 +682,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        //  Internal resource manager connection methods
        // 
------------------------------------------------------------------------
 
-       private void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
+       private void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
ResourceManagerId newResourceManagerId) {
                if (resourceManagerConnection != null) {
                        if (newLeaderAddress != null) {
                                // the resource manager switched to a new leader
@@ -723,7 +713,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                        getResourceID(),
                                        
taskSlotTable.createSlotReport(getResourceID()),
                                        newLeaderAddress,
-                                       newLeaderId,
+                                       newResourceManagerId,
                                        getMainThreadExecutor(),
                                        new 
ResourceManagerRegistrationListener());
                        resourceManagerConnection.start();
@@ -1079,7 +1069,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
 
                                resourceManagerGateway.notifySlotAvailable(
-                                       
resourceManagerConnection.getTargetLeaderId(),
                                        
resourceManagerConnection.getRegistrationId(),
                                        new SlotID(getResourceID(), 
freedSlotIndex),
                                        allocationId);
@@ -1169,12 +1158,10 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
 
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
-                                       
notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID);
-                               }
-                       });
+                       runAsync(
+                               () -> notifyOfNewResourceManagerLeader(
+                                       leaderAddress,
+                                       leaderSessionID != null ? new 
ResourceManagerId(leaderSessionID) : null));
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 8084154..fd56255 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -44,8 +45,11 @@ public interface TaskExecutorGateway extends RpcGateway {
         * Requests a slot from the TaskManager
         *
         * @param slotId slot id for the request
+        * @param jobId for which to request a slot
         * @param allocationId id for the request
-        * @param resourceManagerLeaderId current leader id of the 
ResourceManager
+        * @param targetAddress to which to offer the requested slots
+        * @param resourceManagerId current leader id of the ResourceManager
+        * @param timeout for the operation
         * @return answer to the slot request
         */
        CompletableFuture<Acknowledge> requestSlot(
@@ -53,7 +57,7 @@ public interface TaskExecutorGateway extends RpcGateway {
                JobID jobId,
                AllocationID allocationId,
                String targetAddress,
-               UUID resourceManagerLeaderId,
+               ResourceManagerId resourceManagerId,
                @RpcTimeout Time timeout);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 24eb540..c3d3532 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.registration.RegisteredRpcConnection;
 import org.apache.flink.runtime.registration.RegistrationConnectionListener;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.registration.RetryingRegistration;
@@ -31,7 +32,6 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -41,7 +41,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * The connection between a TaskExecutor and the ResourceManager.
  */
 public class TaskExecutorToResourceManagerConnection
-               extends RegisteredRpcConnection<UUID, ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> {
+               extends RegisteredRpcConnection<ResourceManagerId, 
ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
 
        private final RpcService rpcService;
 
@@ -64,11 +64,11 @@ public class TaskExecutorToResourceManagerConnection
                        ResourceID taskManagerResourceId,
                        SlotReport slotReport,
                        String resourceManagerAddress,
-                       UUID resourceManagerLeaderId,
+                       ResourceManagerId resourceManagerId,
                        Executor executor,
                        
RegistrationConnectionListener<TaskExecutorRegistrationSuccess> 
registrationListener) {
 
-               super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
+               super(log, resourceManagerAddress, resourceManagerId, executor);
 
                this.rpcService = Preconditions.checkNotNull(rpcService);
                this.taskManagerAddress = 
Preconditions.checkNotNull(taskManagerAddress);
@@ -79,7 +79,7 @@ public class TaskExecutorToResourceManagerConnection
 
 
        @Override
-       protected RetryingRegistration<UUID, ResourceManagerGateway, 
TaskExecutorRegistrationSuccess> generateRegistration() {
+       protected RetryingRegistration<ResourceManagerId, 
ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() 
{
                return new 
TaskExecutorToResourceManagerConnection.ResourceManagerRegistration(
                        log,
                        rpcService,
@@ -127,7 +127,7 @@ public class TaskExecutorToResourceManagerConnection
        // 
------------------------------------------------------------------------
 
        private static class ResourceManagerRegistration
-                       extends RetryingRegistration<UUID, 
ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
+                       extends RetryingRegistration<ResourceManagerId, 
ResourceManagerGateway, TaskExecutorRegistrationSuccess> {
 
                private final String taskExecutorAddress;
                
@@ -139,12 +139,12 @@ public class TaskExecutorToResourceManagerConnection
                                Logger log,
                                RpcService rpcService,
                                String targetAddress,
-                               UUID leaderId,
+                               ResourceManagerId resourceManagerId,
                                String taskExecutorAddress,
                                ResourceID resourceID,
                                SlotReport slotReport) {
 
-                       super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, leaderId);
+                       super(log, rpcService, "ResourceManager", 
ResourceManagerGateway.class, targetAddress, resourceManagerId);
                        this.taskExecutorAddress = 
checkNotNull(taskExecutorAddress);
                        this.resourceID = checkNotNull(resourceID);
                        this.slotReport = checkNotNull(slotReport);
@@ -152,10 +152,10 @@ public class TaskExecutorToResourceManagerConnection
 
                @Override
                protected CompletableFuture<RegistrationResponse> 
invokeRegistration(
-                               ResourceManagerGateway resourceManager, UUID 
leaderId, long timeoutMillis) throws Exception {
+                               ResourceManagerGateway resourceManager, 
ResourceManagerId fencingToken, long timeoutMillis) throws Exception {
 
                        Time timeout = Time.milliseconds(timeoutMillis);
-                       return resourceManager.registerTaskExecutor(leaderId, 
taskExecutorAddress, resourceID, slotReport, timeout);
+                       return 
resourceManager.registerTaskExecutor(taskExecutorAddress, resourceID, 
slotReport, timeout);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 737cede..55499f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -49,6 +49,7 @@ import 
org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -532,7 +533,6 @@ public class ResourceManagerTest extends TestLogger {
                        final SlotReport slotReport = new SlotReport();
                        // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
                        CompletableFuture<RegistrationResponse> 
successfulFuture = rmGateway.registerTaskExecutor(
-                               rmLeaderSessionId,
                                taskManagerAddress,
                                taskManagerResourceID,
                                slotReport,
@@ -576,7 +576,7 @@ public class ResourceManagerTest extends TestLogger {
                final String jobMasterAddress = "jm";
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
                final ResourceID rmResourceId = ResourceID.generate();
-               final UUID rmLeaderId = UUID.randomUUID();
+               final ResourceManagerId rmLeaderId = 
ResourceManagerId.generate();
                final UUID jmLeaderId = UUID.randomUUID();
                final JobID jobId = new JobID();
 
@@ -629,11 +629,10 @@ public class ResourceManagerTest extends TestLogger {
 
                        final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
 
-                       
rmLeaderElectionService.isLeader(rmLeaderId).get(timeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+                       
rmLeaderElectionService.isLeader(rmLeaderId.toUUID()).get(timeout.toMilliseconds(),
 TimeUnit.MILLISECONDS);
 
                        // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
                        CompletableFuture<RegistrationResponse> 
successfulFuture = rmGateway.registerJobManager(
-                               rmLeaderId,
                                jmLeaderId,
                                jmResourceId,
                                jobMasterAddress,

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 68c43f8..ead453e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -88,7 +88,7 @@ public class SlotPoolTest extends TestLogger {
                        assertFalse(future.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -125,7 +125,7 @@ public class SlotPoolTest extends TestLogger {
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
                        verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds()).times(2))
-                               .requestSlot(any(UUID.class), any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+                               .requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final List<SlotRequest> slotRequests = 
slotRequestArgumentCaptor.getAllValues();
 
@@ -168,7 +168,7 @@ public class SlotPoolTest extends TestLogger {
                        assertFalse(future1.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -211,7 +211,7 @@ public class SlotPoolTest extends TestLogger {
                        assertFalse(future.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -266,7 +266,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -297,7 +297,7 @@ public class SlotPoolTest extends TestLogger {
        private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
                ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway
-                       .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
+                       .requestSlot(any(UUID.class), any(SlotRequest.class), 
any(Time.class)))
                        .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
 
                return resourceManagerGateway;
@@ -310,7 +310,7 @@ public class SlotPoolTest extends TestLogger {
 
                slotPool.start(UUID.randomUUID(), jobManagerAddress);
 
-               slotPool.connectToResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
+               slotPool.connectToResourceManager(resourceManagerGateway);
 
                return slotPool.getSelfGateway(SlotPoolGateway.class);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 76a0b93..6282ea0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -167,7 +168,7 @@ public class JobMasterTest extends TestLogger {
        public void testHeartbeatTimeoutWithResourceManager() throws Exception {
                final String resourceManagerAddress = "rm";
                final String jobManagerAddress = "jm";
-               final UUID rmLeaderId = UUID.randomUUID();
+               final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
                final UUID jmLeaderId = UUID.randomUUID();
                final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
                final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
@@ -188,13 +189,12 @@ public class JobMasterTest extends TestLogger {
                final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway.registerJobManager(
                        any(UUID.class),
-                       any(UUID.class),
                        any(ResourceID.class),
                        anyString(),
                        any(JobID.class),
                        any(Time.class)
                )).thenReturn(CompletableFuture.completedFuture(new 
JobMasterRegistrationSuccess(
-                       heartbeatInterval, rmLeaderId, rmResourceId)));
+                       heartbeatInterval, resourceManagerId, rmResourceId)));
 
                final TestingRpcService rpc = new TestingRpcService();
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
@@ -225,11 +225,10 @@ public class JobMasterTest extends TestLogger {
                        startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
 
                        // define a leader and see that a registration happens
-                       
rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId);
+                       
rmLeaderRetrievalService.notifyListener(resourceManagerAddress, 
resourceManagerId.toUUID());
 
                        // register job manager success will trigger monitor 
heartbeat target between jm and rm
                        verify(resourceManagerGateway, 
timeout(testingTimeout.toMilliseconds())).registerJobManager(
-                               eq(rmLeaderId),
                                eq(jmLeaderId),
                                eq(jmResourceId),
                                anyString(),

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
index 5568467..ccbb4f4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
@@ -205,6 +205,10 @@ public class MetricRegistryTest extends TestLogger {
                MetricRegistry registry = new 
MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
 
                long start = System.currentTimeMillis();
+
+               // only start counting from now on
+               TestReporter3.reportCount = 0;
+
                for (int x = 0; x < 10; x++) {
                        Thread.sleep(100);
                        int reportCount = TestReporter3.reportCount;
@@ -218,7 +222,7 @@ public class MetricRegistryTest extends TestLogger {
                         * or after T=50.
                         */
                        long maxAllowedReports = (curT - start) / 50 + 2;
-                       Assert.assertTrue("Too many report were triggered.", 
maxAllowedReports >= reportCount);
+                       Assert.assertTrue("Too many reports were triggered.", 
maxAllowedReports >= reportCount);
                }
                Assert.assertTrue("No report was triggered.", 
TestReporter3.reportCount > 0);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
index c213752..2b8792b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java
@@ -82,7 +82,7 @@ public class ResourceManagerHATest extends TestLogger {
 
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
-               CompletableFuture<UUID> revokedLeaderIdFuture = new 
CompletableFuture<>();
+               CompletableFuture<ResourceManagerId> revokedLeaderIdFuture = 
new CompletableFuture<>();
 
                final ResourceManager resourceManager =
                        new StandaloneResourceManager(
@@ -100,23 +100,28 @@ public class ResourceManagerHATest extends TestLogger {
                                @Override
                                public void revokeLeadership() {
                                        super.revokeLeadership();
-                                       runAsync(
-                                               () -> 
revokedLeaderIdFuture.complete(getLeaderSessionId()));
+                                       runAsyncWithoutFencing(
+                                               () -> 
revokedLeaderIdFuture.complete(getFencingToken()));
                                }
                        };
-               resourceManager.start();
-               // before grant leadership, resourceManager's leaderId is null
-               Assert.assertEquals(null, resourceManager.getLeaderSessionId());
-               final UUID leaderId = UUID.randomUUID();
-               leaderElectionService.isLeader(leaderId);
-               // after grant leadership, resourceManager's leaderId has value
-               Assert.assertEquals(leaderId, leaderSessionIdFuture.get());
-               // then revoke leadership, resourceManager's leaderId is null 
again
-               leaderElectionService.notLeader();
-               Assert.assertEquals(null, revokedLeaderIdFuture.get());
-
-               if (testingFatalErrorHandler.hasExceptionOccurred()) {
-                       testingFatalErrorHandler.rethrowError();
+
+               try {
+                       resourceManager.start();
+
+                       Assert.assertNotNull(resourceManager.getFencingToken());
+                       final UUID leaderId = UUID.randomUUID();
+                       leaderElectionService.isLeader(leaderId);
+                       // after grant leadership, resourceManager's leaderId 
has value
+                       Assert.assertEquals(leaderId, 
leaderSessionIdFuture.get());
+                       // then revoke leadership, resourceManager's leaderId 
should be different
+                       leaderElectionService.notLeader();
+                       Assert.assertNotEquals(leaderId, 
revokedLeaderIdFuture.get());
+
+                       if (testingFatalErrorHandler.hasExceptionOccurred()) {
+                               testingFatalErrorHandler.rethrowError();
+                       }
+               } finally {
+                       rpcService.stopService();
                }
        }
 }

Reply via email to