Repository: flink
Updated Branches:
  refs/heads/master ff1660629 -> ba03b78c7


http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 3b1a1b4..b6a0637 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -51,14 +51,15 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.registration.RegistrationConnectionListener;
-import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.registration.RegistrationConnectionListener;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -302,7 +303,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        @Override
        public CompletableFuture<Acknowledge> submitTask(
                        TaskDeploymentDescriptor tdd,
-                       UUID jobManagerLeaderId,
+                       JobMasterId jobMasterId,
                        Time timeout) {
 
                try {
@@ -327,10 +328,10 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                throw new TaskSubmissionException(message);
                        }
 
-                       if (!Objects.equals(jobManagerConnection.getLeaderId(), 
jobManagerLeaderId)) {
+                       if 
(!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) {
                                final String message = "Rejecting the task 
submission because the job manager leader id " +
-                                       jobManagerLeaderId + " does not match 
the expected job manager leader id " +
-                                       jobManagerConnection.getLeaderId() + 
'.';
+                                       jobMasterId + " does not match the 
expected job manager leader id " +
+                                       jobManagerConnection.getJobMasterId() + 
'.';
 
                                log.debug(message);
                                throw new TaskSubmissionException(message);
@@ -353,9 +354,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                tdd.getAttemptNumber());
 
                        InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
-                               jobManagerConnection.getLeaderId(),
                                jobManagerConnection.getJobManagerGateway(),
-                               jobInformation.getJobId(),
                                taskInformation.getJobVertexId(),
                                tdd.getExecutionAttemptId(),
                                taskManagerConfiguration.getTimeout());
@@ -766,7 +765,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                final JobMasterGateway jobMasterGateway = 
jobManagerConnection.getJobManagerGateway();
 
                                final Iterator<TaskSlot> reservedSlotsIterator 
= taskSlotTable.getAllocatedSlots(jobId);
-                               final UUID leaderId = 
jobManagerConnection.getLeaderId();
+                               final JobMasterId jobMasterId = 
jobManagerConnection.getJobMasterId();
 
                                final Collection<SlotOffer> reservedSlots = new 
HashSet<>(2);
 
@@ -777,13 +776,11 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                                        // the slot is either 
free or releasing at the moment
                                                        final String message = 
"Could not mark slot " + jobId + " active.";
                                                        log.debug(message);
-                                                       
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
-                                                               leaderId, new 
Exception(message));
+                                                       
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new 
Exception(message));
                                                }
                                        } catch (SlotNotFoundException e) {
                                                final String message = "Could 
not mark slot " + jobId + " active.";
-                                               
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(),
-                                                       leaderId, new 
Exception(message));
+                                               
jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new 
Exception(message));
                                                continue;
                                        }
                                        reservedSlots.add(offer);
@@ -792,7 +789,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                CompletableFuture<Collection<SlotOffer>> 
acceptedSlotsFuture = jobMasterGateway.offerSlots(
                                        getResourceID(),
                                        reservedSlots,
-                                       leaderId,
                                        taskManagerConfiguration.getTimeout());
 
                                acceptedSlotsFuture.whenCompleteAsync(
@@ -813,7 +809,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                                        }
                                                } else {
                                                        // check if the 
response is still valid
-                                                       if 
(isJobManagerConnectionValid(jobId, leaderId)) {
+                                                       if 
(isJobManagerConnectionValid(jobId, jobMasterId)) {
                                                                // mark 
accepted slots active
                                                                for (SlotOffer 
acceptedSlot : acceptedSlots) {
                                                                        
reservedSlots.remove(acceptedSlot);
@@ -839,14 +835,14 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
        }
 
-       private void establishJobManagerConnection(JobID jobId, final 
JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, 
JMTMRegistrationSuccess registrationSuccess) {
+       private void establishJobManagerConnection(JobID jobId, final 
JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) 
{
 
                if (jobManagerTable.contains(jobId)) {
                        JobManagerConnection oldJobManagerConnection = 
jobManagerTable.get(jobId);
 
-                       if 
(Objects.equals(oldJobManagerConnection.getLeaderId(), jobManagerLeaderId)) {
+                       if 
(Objects.equals(oldJobManagerConnection.getJobMasterId(), 
jobMasterGateway.getFencingToken())) {
                                // we already are connected to the given job 
manager
-                               log.debug("Ignore JobManager gained leadership 
message for {} because we are already connected to it.", jobManagerLeaderId);
+                               log.debug("Ignore JobManager gained leadership 
message for {} because we are already connected to it.", 
jobMasterGateway.getFencingToken());
                                return;
                        } else {
                                closeJobManagerConnection(jobId, new 
Exception("Found new job leader for job id " + jobId + '.'));
@@ -860,7 +856,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                                jobId,
                                jobManagerResourceID,
                                jobMasterGateway,
-                               jobManagerLeaderId,
                                registrationSuccess.getBlobPort());
                jobManagerConnections.put(jobManagerResourceID, 
newJobManagerConnection);
                jobManagerTable.put(jobId, newJobManagerConnection);
@@ -927,15 +922,13 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        JobID jobID,
                        ResourceID resourceID,
                        JobMasterGateway jobMasterGateway,
-                       UUID jobManagerLeaderId,
                        int blobPort) {
                Preconditions.checkNotNull(jobID);
                Preconditions.checkNotNull(resourceID);
-               Preconditions.checkNotNull(jobManagerLeaderId);
                Preconditions.checkNotNull(jobMasterGateway);
                Preconditions.checkArgument(blobPort > 0 || blobPort < 
MAX_BLOB_PORT, "Blob server port is out of range.");
 
-               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);
+               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobMasterGateway);
 
                CheckpointResponder checkpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
 
@@ -959,18 +952,16 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                }
 
                ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
-                       jobManagerLeaderId,
                        jobMasterGateway,
                        getRpcService().getExecutor(),
                        taskManagerConfiguration.getTimeout());
 
-               PartitionProducerStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway);
+               PartitionProducerStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobMasterGateway);
 
                return new JobManagerConnection(
                        jobID,
                        resourceID,
                        jobMasterGateway,
-                       jobManagerLeaderId,
                        taskManagerActions,
                        checkpointResponder,
                        blobCache,
@@ -1006,13 +997,12 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        private void updateTaskExecutionState(
-                       final UUID jobMasterLeaderId,
                        final JobMasterGateway jobMasterGateway,
                        final TaskExecutionState taskExecutionState)
        {
                final ExecutionAttemptID executionAttemptID = 
taskExecutionState.getID();
 
-               CompletableFuture<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, 
taskExecutionState);
+               CompletableFuture<Acknowledge> futureAcknowledge = 
jobMasterGateway.updateTaskExecutionState(taskExecutionState);
 
                futureAcknowledge.whenCompleteAsync(
                        (ack, throwable) -> {
@@ -1024,7 +1014,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        private void unregisterTaskAndNotifyFinalState(
-                       final UUID jobMasterLeaderId,
                        final JobMasterGateway jobMasterGateway,
                        final ExecutionAttemptID executionAttemptID) {
 
@@ -1044,7 +1033,6 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        AccumulatorSnapshot accumulatorSnapshot = 
task.getAccumulatorRegistry().getSnapshot();
 
                        updateTaskExecutionState(
-                                       jobMasterLeaderId,
                                        jobMasterGateway,
                                        new TaskExecutionState(
                                                task.getJobID(),
@@ -1101,10 +1089,10 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
                return (resourceManagerConnection != null && 
resourceManagerConnection.isConnected());
        }
 
-       private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) 
{
+       private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId 
jobMasterId) {
                JobManagerConnection jmConnection = jobManagerTable.get(jobId);
 
-               return jmConnection != null && 
Objects.equals(jmConnection.getLeaderId(), leaderId);
+               return jmConnection != null && 
Objects.equals(jmConnection.getJobMasterId(), jobMasterId);
        }
 
        // 
------------------------------------------------------------------------
@@ -1152,7 +1140,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        // 
------------------------------------------------------------------------
 
        /**
-        * The listener for leader changes of the resource manager
+        * The listener for leader changes of the resource manager.
         */
        private final class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
 
@@ -1176,23 +1164,18 @@ public class TaskExecutor extends RpcEndpoint 
implements TaskExecutorGateway {
                public void jobManagerGainedLeadership(
                        final JobID jobId,
                        final JobMasterGateway jobManagerGateway,
-                       final UUID jobLeaderId,
                        final JMTMRegistrationSuccess registrationMessage) {
-                       runAsync(new Runnable() {
-                               @Override
-                               public void run() {
+                       runAsync(
+                               () ->
                                        establishJobManagerConnection(
                                                jobId,
                                                jobManagerGateway,
-                                               jobLeaderId,
-                                               registrationMessage);
-                               }
-                       });
+                                               registrationMessage));
                }
 
                @Override
-               public void jobManagerLostLeadership(final JobID jobId, final 
UUID jobLeaderId) {
-                       log.info("JobManager for job {} with leader id {} lost 
leadership.", jobId, jobLeaderId);
+               public void jobManagerLostLeadership(final JobID jobId, final 
JobMasterId jobMasterId) {
+                       log.info("JobManager for job {} with leader id {} lost 
leadership.", jobId, jobMasterId);
 
                        runAsync(new Runnable() {
                                @Override
@@ -1233,11 +1216,9 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
        }
 
        private final class TaskManagerActionsImpl implements 
TaskManagerActions {
-               private final UUID jobMasterLeaderId;
                private final JobMasterGateway jobMasterGateway;
 
-               private TaskManagerActionsImpl(UUID jobMasterLeaderId, 
JobMasterGateway jobMasterGateway) {
-                       this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
+               private TaskManagerActionsImpl(JobMasterGateway 
jobMasterGateway) {
                        this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                }
 
@@ -1246,7 +1227,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
                        runAsync(new Runnable() {
                                @Override
                                public void run() {
-                                       
unregisterTaskAndNotifyFinalState(jobMasterLeaderId, jobMasterGateway, 
executionAttemptID);
+                                       
unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID);
                                }
                        });
                }
@@ -1273,7 +1254,7 @@ public class TaskExecutor extends RpcEndpoint implements 
TaskExecutorGateway {
 
                @Override
                public void updateTaskExecutionState(final TaskExecutionState 
taskExecutionState) {
-                       
TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, 
taskExecutionState);
+                       
TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, 
taskExecutionState);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index fd56255..ee0f69d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -27,13 +27,13 @@ import 
org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.taskmanager.Task;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -64,13 +64,13 @@ public interface TaskExecutorGateway extends RpcGateway {
         * Submit a {@link Task} to the {@link TaskExecutor}.
         *
         * @param tdd describing the task to submit
-        * @param leaderId of the job leader
+        * @param jobMasterId identifying the submitting JobMaster
         * @param timeout of the submit operation
         * @return Future acknowledge of the successful operation
         */
        CompletableFuture<Acknowledge> submitTask(
                TaskDeploymentDescriptor tdd,
-               UUID leaderId,
+               JobMasterId jobMasterId,
                @RpcTimeout Time timeout);
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
index a919c78..baa403b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.taskexecutor.rpc;
 
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -30,27 +29,20 @@ import 
org.apache.flink.runtime.jobmaster.SerializedInputSplit;
 import org.apache.flink.util.InstantiationUtil;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 public class RpcInputSplitProvider implements InputSplitProvider {
-       private final UUID jobMasterLeaderId;
        private final JobMasterGateway jobMasterGateway;
-       private final JobID jobID;
        private final JobVertexID jobVertexID;
        private final ExecutionAttemptID executionAttemptID;
        private final Time timeout;
 
        public RpcInputSplitProvider(
-                       UUID jobMasterLeaderId,
                        JobMasterGateway jobMasterGateway,
-                       JobID jobID,
                        JobVertexID jobVertexID,
                        ExecutionAttemptID executionAttemptID,
                        Time timeout) {
-               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
-               this.jobID = Preconditions.checkNotNull(jobID);
                this.jobVertexID = Preconditions.checkNotNull(jobVertexID);
                this.executionAttemptID = 
Preconditions.checkNotNull(executionAttemptID);
                this.timeout = Preconditions.checkNotNull(timeout);
@@ -62,7 +54,8 @@ public class RpcInputSplitProvider implements 
InputSplitProvider {
                Preconditions.checkNotNull(userCodeClassLoader);
 
                CompletableFuture<SerializedInputSplit> futureInputSplit = 
jobMasterGateway.requestNextInputSplit(
-                               jobMasterLeaderId, jobVertexID, 
executionAttemptID);
+                       jobVertexID,
+                       executionAttemptID);
 
                try {
                        SerializedInputSplit serializedInputSplit = 
futureInputSplit.get(timeout.getSize(), timeout.getUnit());

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
index 26e1b0e..f3eb717 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java
@@ -26,16 +26,13 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.util.Preconditions;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
 public class RpcPartitionStateChecker implements PartitionProducerStateChecker 
{
 
-       private final UUID jobMasterLeaderId;
        private final JobMasterGateway jobMasterGateway;
 
-       public RpcPartitionStateChecker(UUID jobMasterLeaderId, 
JobMasterGateway jobMasterGateway) {
-               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
+       public RpcPartitionStateChecker(JobMasterGateway jobMasterGateway) {
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
        }
 
@@ -45,6 +42,6 @@ public class RpcPartitionStateChecker implements 
PartitionProducerStateChecker {
                        IntermediateDataSetID resultId,
                        ResultPartitionID partitionId) {
 
-               return 
jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, 
partitionId);
+               return jobMasterGateway.requestPartitionState(resultId, 
partitionId);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
index d898562..82a6fbc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -37,25 +36,21 @@ public class RpcResultPartitionConsumableNotifier 
implements ResultPartitionCons
 
        private static final Logger LOG = 
LoggerFactory.getLogger(RpcResultPartitionConsumableNotifier.class);
 
-       private final UUID jobMasterLeaderId;
        private final JobMasterGateway jobMasterGateway;
        private final Executor executor;
        private final Time timeout;
 
        public RpcResultPartitionConsumableNotifier(
-                       UUID jobMasterLeaderId,
                        JobMasterGateway jobMasterGateway,
                        Executor executor,
                        Time timeout) {
-               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                this.executor = Preconditions.checkNotNull(executor);
                this.timeout = Preconditions.checkNotNull(timeout);
        }
        @Override
        public void notifyPartitionConsumable(JobID jobId, ResultPartitionID 
partitionId, final TaskActions taskActions) {
-               CompletableFuture<Acknowledge> acknowledgeFuture = 
jobMasterGateway.scheduleOrUpdateConsumers(
-                               jobMasterLeaderId, partitionId, timeout);
+               CompletableFuture<Acknowledge> acknowledgeFuture = 
jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout);
 
                acknowledgeFuture.whenCompleteAsync(
                        (Acknowledge ack, Throwable throwable) -> {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 55499f5..6013e91 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -577,7 +578,7 @@ public class ResourceManagerTest extends TestLogger {
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
                final ResourceID rmResourceId = ResourceID.generate();
                final ResourceManagerId rmLeaderId = 
ResourceManagerId.generate();
-               final UUID jmLeaderId = UUID.randomUUID();
+               final JobMasterId jobMasterId = JobMasterId.generate();
                final JobID jobId = new JobID();
 
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
@@ -590,7 +591,7 @@ public class ResourceManagerTest extends TestLogger {
                        Time.seconds(5L));
 
                final TestingLeaderElectionService rmLeaderElectionService = 
new TestingLeaderElectionService();
-               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderId);
+               final TestingLeaderRetrievalService jmLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                final TestingHighAvailabilityServices highAvailabilityServices 
= new TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobId, 
jmLeaderRetrievalService);
@@ -633,7 +634,7 @@ public class ResourceManagerTest extends TestLogger {
 
                        // test registration response successful and it will 
trigger monitor heartbeat target, schedule heartbeat request at interval time
                        CompletableFuture<RegistrationResponse> 
successfulFuture = rmGateway.registerJobManager(
-                               jmLeaderId,
+                               jobMasterId,
                                jmResourceId,
                                jobMasterAddress,
                                jobId,
@@ -665,7 +666,7 @@ public class ResourceManagerTest extends TestLogger {
                        // run the timeout runnable to simulate a heartbeat 
timeout
                        timeoutRunnable.run();
 
-                       verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(jmLeaderId),
 eq(rmLeaderId), any(TimeoutException.class));
+                       verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(rmLeaderId),
 any(TimeoutException.class));
 
                } finally {
                        rpcService.stopService();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
index 8d613ac..9d742e2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -25,15 +25,16 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.util.clock.SystemClock;
+import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -47,7 +48,7 @@ import static org.mockito.Mockito.mock;
 /**
  * Tests for the SlotPool using a proper RPC setup.
  */
-public class SlotPoolRpcTest {
+public class SlotPoolRpcTest extends TestLogger {
 
        private static RpcService rpcService;
 
@@ -80,7 +81,7 @@ public class SlotPoolRpcTest {
                                Time.days(1), Time.days(1),
                                Time.milliseconds(100) // this is the timeout 
for the request tested here
                );
-               pool.start(UUID.randomUUID(), "foobar");
+               pool.start(JobMasterId.generate(), "foobar");
 
                CompletableFuture<SimpleSlot> future = 
pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, 
Time.days(1));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index ead453e..5993dcb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.runtime.rpc.RpcService;
@@ -39,7 +40,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
 
 import java.util.List;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
@@ -88,7 +88,7 @@ public class SlotPoolTest extends TestLogger {
                        assertFalse(future.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -125,7 +125,7 @@ public class SlotPoolTest extends TestLogger {
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
                        verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds()).times(2))
-                               .requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+                               .requestSlot(any(JobMasterId.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final List<SlotRequest> slotRequests = 
slotRequestArgumentCaptor.getAllValues();
 
@@ -168,7 +168,7 @@ public class SlotPoolTest extends TestLogger {
                        assertFalse(future1.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -211,7 +211,7 @@ public class SlotPoolTest extends TestLogger {
                        assertFalse(future.isDone());
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -266,7 +266,7 @@ public class SlotPoolTest extends TestLogger {
                        CompletableFuture<SimpleSlot> future1 = 
slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), 
DEFAULT_TESTING_PROFILE, null, timeout);
 
                        ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
-                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+                       verify(resourceManagerGateway, 
Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
 
                        final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
 
@@ -297,7 +297,7 @@ public class SlotPoolTest extends TestLogger {
        private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
                ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway
-                       .requestSlot(any(UUID.class), any(SlotRequest.class), 
any(Time.class)))
+                       .requestSlot(any(JobMasterId.class), 
any(SlotRequest.class), any(Time.class)))
                        .thenReturn(mock(CompletableFuture.class, 
RETURNS_MOCKS));
 
                return resourceManagerGateway;
@@ -308,7 +308,7 @@ public class SlotPoolTest extends TestLogger {
                        ResourceManagerGateway resourceManagerGateway) throws 
Exception {
                final String jobManagerAddress = "foobar";
 
-               slotPool.start(UUID.randomUUID(), jobManagerAddress);
+               slotPool.start(JobMasterId.generate(), jobManagerAddress);
 
                slotPool.connectToResourceManager(resourceManagerGateway);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index 0f38db2..b4f50fb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -133,7 +133,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
                assertTrue(!jobCompletion.isJobFinished());
                assertTrue(!jobCompletion.isJobFailed());
 
-               verify(jobManager).start(any(UUID.class), any(Time.class));
+               verify(jobManager).start(any(JobMasterId.class), 
any(Time.class));
                
                runner.shutdown();
                verify(leaderElectionService).stop();
@@ -165,9 +165,9 @@ public class JobManagerRunnerMockTest extends TestLogger {
        public void testJobFinished() throws Exception {
                runner.start();
 
-               UUID leaderSessionID = UUID.randomUUID();
-               runner.grantLeadership(leaderSessionID);
-               verify(jobManager).start(eq(leaderSessionID), any(Time.class));
+               JobMasterId jobMasterId = JobMasterId.generate();
+               runner.grantLeadership(jobMasterId.toUUID());
+               verify(jobManager).start(eq(jobMasterId), any(Time.class));
                assertTrue(!jobCompletion.isJobFinished());
 
                // runner been told by JobManager that job is finished
@@ -185,9 +185,9 @@ public class JobManagerRunnerMockTest extends TestLogger {
        public void testJobFailed() throws Exception {
                runner.start();
 
-               UUID leaderSessionID = UUID.randomUUID();
-               runner.grantLeadership(leaderSessionID);
-               verify(jobManager).start(eq(leaderSessionID), any(Time.class));
+               JobMasterId jobMasterId = JobMasterId.generate();
+               runner.grantLeadership(jobMasterId.toUUID());
+               verify(jobManager).start(eq(jobMasterId), any(Time.class));
                assertTrue(!jobCompletion.isJobFinished());
 
                // runner been told by JobManager that job is failed
@@ -204,9 +204,9 @@ public class JobManagerRunnerMockTest extends TestLogger {
        public void testLeadershipRevoked() throws Exception {
                runner.start();
 
-               UUID leaderSessionID = UUID.randomUUID();
-               runner.grantLeadership(leaderSessionID);
-               verify(jobManager).start(eq(leaderSessionID), any(Time.class));
+               JobMasterId jobMasterId = JobMasterId.generate();
+               runner.grantLeadership(jobMasterId.toUUID());
+               verify(jobManager).start(eq(jobMasterId), any(Time.class));
                assertTrue(!jobCompletion.isJobFinished());
 
                runner.revokeLeadership();
@@ -219,18 +219,18 @@ public class JobManagerRunnerMockTest extends TestLogger {
        public void testRegainLeadership() throws Exception {
                runner.start();
 
-               UUID leaderSessionID = UUID.randomUUID();
-               runner.grantLeadership(leaderSessionID);
-               verify(jobManager).start(eq(leaderSessionID), any(Time.class));
+               JobMasterId jobMasterId = JobMasterId.generate();
+               runner.grantLeadership(jobMasterId.toUUID());
+               verify(jobManager).start(eq(jobMasterId), any(Time.class));
                assertTrue(!jobCompletion.isJobFinished());
 
                runner.revokeLeadership();
                verify(jobManager).suspend(any(Throwable.class), 
any(Time.class));
                assertFalse(runner.isShutdown());
 
-               UUID leaderSessionID2 = UUID.randomUUID();
-               runner.grantLeadership(leaderSessionID2);
-               verify(jobManager).start(eq(leaderSessionID2), any(Time.class));
+               JobMasterId jobMasterId2 = JobMasterId.generate();
+               runner.grantLeadership(jobMasterId2.toUUID());
+               verify(jobManager).start(eq(jobMasterId2), any(Time.class));
        }
 
        private static class TestingOnCompletionActions implements 
OnCompletionActions, FatalErrorHandler {

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 6282ea0..64cc13b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -51,16 +51,12 @@ import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.net.InetAddress;
 import java.net.URL;
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.*;
-import static org.mockito.Mockito.when;
 
 @RunWith(PowerMockRunner.class)
 @PrepareForTest(BlobLibraryCacheManager.class)
@@ -79,7 +75,7 @@ public class JobMasterTest extends TestLogger {
                final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
 
                final String jobManagerAddress = "jm";
-               final UUID jmLeaderId = UUID.randomUUID();
+               final JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
 
                final String taskManagerAddress = "tm";
@@ -118,7 +114,7 @@ public class JobMasterTest extends TestLogger {
                                testingFatalErrorHandler,
                                new FlinkUserCodeClassLoader(new URL[0]));
 
-                       CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jmLeaderId, testingTimeout);
+                       CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
                        // wait for the start to complete
                        startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -127,7 +123,7 @@ public class JobMasterTest extends TestLogger {
 
                        // register task manager will trigger monitor heartbeat 
target, schedule heartbeat request at interval time
                        CompletableFuture<RegistrationResponse> 
registrationResponse = jobMasterGateway
-                               .registerTaskManager(taskManagerAddress, 
taskManagerLocation, jmLeaderId, testingTimeout);
+                               .registerTaskManager(taskManagerAddress, 
taskManagerLocation, testingTimeout);
 
                        // wait for the completion of the registration
                        registrationResponse.get();
@@ -169,7 +165,7 @@ public class JobMasterTest extends TestLogger {
                final String resourceManagerAddress = "rm";
                final String jobManagerAddress = "jm";
                final ResourceManagerId resourceManagerId = 
ResourceManagerId.generate();
-               final UUID jmLeaderId = UUID.randomUUID();
+               final JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID rmResourceId = new 
ResourceID(resourceManagerAddress);
                final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
                final JobGraph jobGraph = new JobGraph();
@@ -188,7 +184,7 @@ public class JobMasterTest extends TestLogger {
 
                final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway.registerJobManager(
-                       any(UUID.class),
+                       any(JobMasterId.class),
                        any(ResourceID.class),
                        anyString(),
                        any(JobID.class),
@@ -219,7 +215,7 @@ public class JobMasterTest extends TestLogger {
                                testingFatalErrorHandler,
                                new FlinkUserCodeClassLoader(new URL[0]));
 
-                       CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jmLeaderId, testingTimeout);
+                       CompletableFuture<Acknowledge> startFuture = 
jobMaster.start(jobMasterId, testingTimeout);
 
                        // wait for the start operation to complete
                        startFuture.get(testingTimeout.toMilliseconds(), 
TimeUnit.MILLISECONDS);
@@ -229,7 +225,7 @@ public class JobMasterTest extends TestLogger {
 
                        // register job manager success will trigger monitor 
heartbeat target between jm and rm
                        verify(resourceManagerGateway, 
timeout(testingTimeout.toMilliseconds())).registerJobManager(
-                               eq(jmLeaderId),
+                               eq(jobMasterId),
                                eq(jmResourceId),
                                anyString(),
                                eq(jobGraph.getJobID()),

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
index 7b8703e..fb5ee8b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -62,7 +63,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
        public void testAddingJob() throws Exception {
                final JobID jobId = new JobID();
                final String address = "foobar";
-               final UUID leaderId = UUID.randomUUID();
+               final JobMasterId leaderId = JobMasterId.generate();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
                        null,
@@ -83,10 +84,10 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
                jobLeaderIdService.addJob(jobId);
 
-               CompletableFuture<UUID> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+               CompletableFuture<JobMasterId> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
 
                // notify the leader id service about the new leader
-               leaderRetrievalService.notifyListener(address, leaderId);
+               leaderRetrievalService.notifyListener(address, 
leaderId.toUUID());
 
                assertEquals(leaderId, leaderIdFuture.get());
 
@@ -117,7 +118,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
                jobLeaderIdService.addJob(jobId);
 
-               CompletableFuture<UUID> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+               CompletableFuture<JobMasterId> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
 
                // remove the job before we could find a leader
                jobLeaderIdService.removeJob(jobId);
@@ -183,7 +184,7 @@ public class JobLeaderIdServiceTest extends TestLogger {
        public void jobTimeoutAfterLostLeadership() throws Exception {
                final JobID jobId = new JobID();
                final String address = "foobar";
-               final UUID leaderId = UUID.randomUUID();
+               final JobMasterId leaderId = JobMasterId.generate();
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                TestingLeaderRetrievalService leaderRetrievalService = new 
TestingLeaderRetrievalService(
                        null,
@@ -228,10 +229,10 @@ public class JobLeaderIdServiceTest extends TestLogger {
 
                jobLeaderIdService.addJob(jobId);
 
-               CompletableFuture<UUID> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
+               CompletableFuture<JobMasterId> leaderIdFuture = 
jobLeaderIdService.getLeaderId(jobId);
 
                // notify the leader id service about the new leader
-               leaderRetrievalService.notifyListener(address, leaderId);
+               leaderRetrievalService.notifyListener(address, 
leaderId.toUUID());
 
                assertEquals(leaderId, leaderIdFuture.get());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 1de3284..156bc73 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
@@ -46,7 +47,6 @@ import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 
-import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
@@ -78,16 +78,16 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
        public void testRegisterJobMaster() throws Exception {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
-               UUID jmLeaderID = UUID.randomUUID();
+               JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager<?> resourceManager = 
createAndStartResourceManager(mock(LeaderElectionService.class), jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final ResourceManagerGateway rmGateway = 
resourceManager.getSelfGateway(ResourceManagerGateway.class);
 
                // test response successful
                CompletableFuture<RegistrationResponse> successfulFuture = 
rmGateway.registerJobManager(
-                       jmLeaderID,
+                       jobMasterId,
                        jmResourceId,
                        jobMasterAddress,
                        jobID,
@@ -107,9 +107,9 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
        public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws 
Exception {
                String jobMasterAddress = "/jobMasterAddress1";
                JobID jobID = mockJobMaster(jobMasterAddress);
-               UUID jmLeaderID = UUID.randomUUID();
+               JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
-               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID);
+               TestingLeaderRetrievalService jobMasterLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID());
                TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
                final ResourceManager<?> resourceManager = 
createAndStartResourceManager(mock(LeaderElectionService.class), jobID, 
jobMasterLeaderRetrievalService, testingFatalErrorHandler);
                final ResourceManagerGateway wronglyFencedGateway = 
rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), 
ResourceManagerGateway.class)
@@ -117,7 +117,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
 
                // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
                CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = 
wronglyFencedGateway.registerJobManager(
-                       jmLeaderID,
+                       jobMasterId,
                        jmResourceId,
                        jobMasterAddress,
                        jobID,
@@ -152,9 +152,9 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                final ResourceID jmResourceId = new 
ResourceID(jobMasterAddress);
 
                // test throw exception when receive a registration from job 
master which takes unmatched leaderSessionId
-               UUID differentLeaderSessionID = UUID.randomUUID();
+               JobMasterId differentJobMasterId = JobMasterId.generate();
                CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = 
rmGateway.registerJobManager(
-                       differentLeaderSessionID,
+                       differentJobMasterId,
                        jmResourceId,
                        jobMasterAddress,
                        jobID,
@@ -185,7 +185,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
                // test throw exception when receive a registration from job 
master which takes invalid address
                String invalidAddress = "/jobMasterAddress2";
                CompletableFuture<RegistrationResponse> invalidAddressFuture = 
rmGateway.registerJobManager(
-                       HighAvailabilityServices.DEFAULT_LEADER_ID,
+                       new 
JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID),
                        jmResourceId,
                        invalidAddress,
                        jobID,
@@ -221,7 +221,7 @@ public class ResourceManagerJobMasterTest extends 
TestLogger {
 
                // this should fail because we try to register a job leader 
listener for an unknown job id
                CompletableFuture<RegistrationResponse> registrationFuture = 
rmGateway.registerJobManager(
-                       HighAvailabilityServices.DEFAULT_LEADER_ID,
+                       new 
JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID),
                        jmResourceId,
                        jobMasterAddress,
                        unknownJobIDToHAServices,

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
index 945cbf3..348dce6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
@@ -92,7 +93,7 @@ public class TaskExecutorITCase extends TestLogger {
                final TestingLeaderRetrievalService rmLeaderRetrievalService = 
new TestingLeaderRetrievalService(null, null);
                final String rmAddress = "rm";
                final String jmAddress = "jm";
-               final UUID jmLeaderId = UUID.randomUUID();
+               final JobMasterId jobMasterId = JobMasterId.generate();
                final ResourceID rmResourceId = new ResourceID(rmAddress);
                final ResourceID jmResourceId = new ResourceID(jmAddress);
                final JobID jobId = new JobID();
@@ -100,7 +101,7 @@ public class TaskExecutorITCase extends TestLogger {
 
                
testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
                
testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService);
-               testingHAServices.setJobMasterLeaderRetriever(jobId, new 
TestingLeaderRetrievalService(jmAddress, jmLeaderId));
+               testingHAServices.setJobMasterLeaderRetriever(jobId, new 
TestingLeaderRetrievalService(jmAddress, jobMasterId.toUUID()));
 
                TestingRpcService rpcService = new TestingRpcService();
                ResourceManagerConfiguration resourceManagerConfiguration = new 
ResourceManagerConfiguration(
@@ -162,14 +163,14 @@ public class TaskExecutorITCase extends TestLogger {
 
                JobMasterGateway jmGateway = mock(JobMasterGateway.class);
 
-               when(jmGateway.registerTaskManager(any(String.class), 
any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class)))
+               when(jmGateway.registerTaskManager(any(String.class), 
any(TaskManagerLocation.class), any(Time.class)))
                        .thenReturn(CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(taskManagerResourceId, 1234)));
                when(jmGateway.getHostname()).thenReturn(jmAddress);
                when(jmGateway.offerSlots(
                        eq(taskManagerResourceId),
                        any(Iterable.class),
-                       eq(jmLeaderId),
                        
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
+               when(jmGateway.getFencingToken()).thenReturn(jobMasterId);
 
 
                rpcService.registerGateway(rmAddress, 
resourceManager.getSelfGateway(ResourceManagerGateway.class));
@@ -193,7 +194,7 @@ public class TaskExecutorITCase extends TestLogger {
                        rmLeaderRetrievalService.notifyListener(rmAddress, 
rmLeaderId);
 
                        CompletableFuture<RegistrationResponse> 
registrationResponseFuture = rmGateway.registerJobManager(
-                               jmLeaderId,
+                               jobMasterId,
                                jmResourceId,
                                jmAddress,
                                jobId,
@@ -203,14 +204,14 @@ public class TaskExecutorITCase extends TestLogger {
 
                        assertTrue(registrationResponse instanceof 
JobMasterRegistrationSuccess);
 
-                       CompletableFuture<Acknowledge> slotAck = 
rmGateway.requestSlot(jmLeaderId, slotRequest, timeout);
+                       CompletableFuture<Acknowledge> slotAck = 
rmGateway.requestSlot(jobMasterId, slotRequest, timeout);
 
                        slotAck.get();
 
                        verify(jmGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(
                                eq(taskManagerResourceId),
                                
(Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)),
-                               eq(jmLeaderId), any(Time.class));
+                               any(Time.class));
                } finally {
                        if (testingFatalErrorHandler.hasExceptionOccurred()) {
                                testingFatalErrorHandler.rethrowError();

http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 2112c1b..7146445 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -55,6 +55,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -190,7 +191,6 @@ public class TaskExecutorTest extends TestLogger {
                when(jobMasterGateway.registerTaskManager(
                                any(String.class),
                                eq(taskManagerLocation),
-                               eq(jmLeaderId),
                                any(Time.class)
                )).thenReturn(CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress);
@@ -228,7 +228,7 @@ public class TaskExecutorTest extends TestLogger {
 
                        // register task manager success will trigger 
monitoring heartbeat target between tm and jm
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).registerTaskManager(
-                                       eq(taskManager.getAddress()), 
eq(taskManagerLocation), eq(jmLeaderId), any(Time.class));
+                                       eq(taskManager.getAddress()), 
eq(taskManagerLocation), any(Time.class));
 
                        // the timeout should trigger disconnecting from the 
JobManager
                        verify(jobMasterGateway, timeout(heartbeatTimeout * 
50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), 
any(TimeoutException.class));
@@ -657,7 +657,7 @@ public class TaskExecutorTest extends TestLogger {
                final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
                final JobID jobId = new JobID();
                final AllocationID allocationId = new AllocationID();
-               final UUID jobManagerLeaderId = UUID.randomUUID();
+               final JobMasterId jobMasterId = JobMasterId.generate();
                final JobVertexID jobVertexId = new JobVertexID();
 
                JobInformation jobInformation = new JobInformation(
@@ -694,11 +694,13 @@ public class TaskExecutorTest extends TestLogger {
                final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
                
when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
 
+               final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+               
when(jobMasterGateway.getFencingToken()).thenReturn(jobMasterId);
+
                final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
                        jobId,
                        ResourceID.generate(),
-                       mock(JobMasterGateway.class),
-                       jobManagerLeaderId,
+                       jobMasterGateway,
                        mock(TaskManagerActions.class),
                        mock(CheckpointResponder.class),
                        mock(BlobCache.class),
@@ -755,7 +757,7 @@ public class TaskExecutorTest extends TestLogger {
 
                        final TaskExecutorGateway tmGateway = 
taskManager.getSelfGateway(TaskExecutorGateway.class);
 
-                       tmGateway.submitTask(tdd, jobManagerLeaderId, timeout);
+                       tmGateway.submitTask(tdd, jobMasterId, timeout);
 
                        CompletableFuture<Boolean> completionFuture = 
TestInvokable.completableFuture;
 
@@ -833,14 +835,12 @@ public class TaskExecutorTest extends TestLogger {
                when(jobMasterGateway.registerTaskManager(
                                any(String.class),
                                eq(taskManagerLocation),
-                               eq(jobManagerLeaderId),
                                any(Time.class)
                )).thenReturn(CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
                when(jobMasterGateway.offerSlots(
                        any(ResourceID.class),
                        any(Iterable.class),
-                       any(UUID.class),
                        
any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS));
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
@@ -894,7 +894,6 @@ public class TaskExecutorTest extends TestLogger {
                        verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(
                                        any(ResourceID.class),
                                        
(Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)),
-                                       eq(jobManagerLeaderId),
                                        any(Time.class));
 
                        // check if a concurrent error occurred
@@ -958,13 +957,12 @@ public class TaskExecutorTest extends TestLogger {
                when(jobMasterGateway.registerTaskManager(
                                any(String.class),
                                eq(taskManagerLocation),
-                               eq(jobManagerLeaderId),
                                any(Time.class)
                )).thenReturn(CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
 
                when(jobMasterGateway.offerSlots(
-                               any(ResourceID.class), any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class)))
+                               any(ResourceID.class), any(Iterable.class), 
any(Time.class)))
                        
.thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1)));
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
@@ -1163,10 +1161,10 @@ public class TaskExecutorTest extends TestLogger {
                final ResourceID resourceManagerResourceId = new 
ResourceID(resourceManagerAddress);
 
                final String jobManagerAddress = "jm";
-               final UUID jobManagerLeaderId = UUID.randomUUID();
+               final JobMasterId jobMasterId = JobMasterId.generate();
 
                final LeaderRetrievalService 
resourceManagerLeaderRetrievalService = new 
TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
-               final LeaderRetrievalService jobManagerLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+               final LeaderRetrievalService jobManagerLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobManagerAddress, jobMasterId.toUUID());
                
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
                haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
 
@@ -1193,13 +1191,10 @@ public class TaskExecutorTest extends TestLogger {
                when(jobMasterGateway.registerTaskManager(
                        any(String.class),
                        eq(taskManagerLocation),
-                       eq(jobManagerLeaderId),
                        any(Time.class)
                )).thenReturn(CompletableFuture.completedFuture(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
                
when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress);
-               when(jobMasterGateway.updateTaskExecutionState(
-                       any(UUID.class),
-                       
any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
+               
when(jobMasterGateway.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get()));
 
 
                rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
@@ -1212,7 +1207,6 @@ public class TaskExecutorTest extends TestLogger {
                        jobId,
                        jmResourceId,
                        jobMasterGateway,
-                       jobManagerLeaderId,
                        mock(TaskManagerActions.class),
                        mock(CheckpointResponder.class),
                        mock(BlobCache.class),
@@ -1297,7 +1291,6 @@ public class TaskExecutorTest extends TestLogger {
                                jobMasterGateway.offerSlots(
                                        any(ResourceID.class),
                                        any(Iterable.class),
-                                       eq(jobManagerLeaderId),
                                        any(Time.class)))
                                .thenReturn(offerResultFuture);
 
@@ -1305,10 +1298,10 @@ public class TaskExecutorTest extends TestLogger {
                        // been properly started. This will also offer the 
slots to the job master
                        jobLeaderService.addJob(jobId, jobManagerAddress);
 
-                       verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), 
any(Iterable.class), eq(jobManagerLeaderId), any(Time.class));
+                       verify(jobMasterGateway, 
Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), 
any(Iterable.class), any(Time.class));
 
                        // submit the task without having acknowledge the 
offered slots
-                       tmGateway.submitTask(tdd, jobManagerLeaderId, timeout);
+                       tmGateway.submitTask(tdd, jobMasterId, timeout);
 
                        // acknowledge the offered slots
                        
offerResultFuture.complete(Collections.singleton(offer1));
@@ -1351,7 +1344,6 @@ public class TaskExecutorTest extends TestLogger {
                final JobID jobId = new JobID();
                final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
                when(jobMasterGateway.getHostname()).thenReturn("localhost");
-               final UUID jobLeaderId = UUID.randomUUID();
                final JMTMRegistrationSuccess registrationMessage = new 
JMTMRegistrationSuccess(ResourceID.generate(), 1);
                final JobManagerTable jobManagerTableMock = spy(new 
JobManagerTable());
 
@@ -1382,10 +1374,10 @@ public class TaskExecutorTest extends TestLogger {
 
                        JobLeaderListener taskExecutorListener = 
jobLeaderListenerArgumentCaptor.getValue();
 
-                       taskExecutorListener.jobManagerGainedLeadership(jobId, 
jobMasterGateway, jobLeaderId, registrationMessage);
+                       taskExecutorListener.jobManagerGainedLeadership(jobId, 
jobMasterGateway, registrationMessage);
 
                        // duplicate job manager gained leadership message
-                       taskExecutorListener.jobManagerGainedLeadership(jobId, 
jobMasterGateway, jobLeaderId, registrationMessage);
+                       taskExecutorListener.jobManagerGainedLeadership(jobId, 
jobMasterGateway, registrationMessage);
 
                        ArgumentCaptor<JobManagerConnection> 
jobManagerConnectionArgumentCaptor = 
ArgumentCaptor.forClass(JobManagerConnection.class);
 

Reply via email to