[FLINK-4343] [tm] Implement TM -> JM registration logic

Upon requesting a slot for a new job, the TaskManager registers this job at the
JobLeaderService. The job leader service is responsible to monitor job leader 
changes
for all registered jobs. In case of a new job leader, the service will try to 
establish
a connection to the new job leader. Upon establishing the connection the task 
manager
is informed about it. The task manager will then offer all allocated but not 
yet active
slots to the new job leader.

Implement JobLeaderService

The JobLeaderService is responsible for establishing a connection to the JM 
leader of a given
job.

Disable TaskExecutorTest#testRejectAllocationRequestsForOutOfSyncSlots

Add simple task submission test

Add job leader detection test case

Add task slot acceptance test

Fix RpcCompletenessTest

Add comments

This closes #2640.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/92880056
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/92880056
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/92880056

Branch: refs/heads/flip-6
Commit: 928800569234156876b9744de064cc359f121664
Parents: 9da76dc
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Oct 5 17:02:06 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sun Oct 16 10:46:09 2016 +0200

----------------------------------------------------------------------
 .../org/apache/flink/util/ReflectionUtil.java   | 110 ++++
 .../deployment/TaskDeploymentDescriptor.java    |  11 +-
 .../runtime/executiongraph/ExecutionVertex.java |   2 +
 .../HighAvailabilityServices.java               |   3 +-
 .../runtime/highavailability/NonHaServices.java |   4 +-
 .../highavailability/ZookeeperHaServices.java   |   2 +-
 .../jobmaster/JMTMRegistrationSuccess.java      |  45 ++
 .../flink/runtime/jobmaster/JobMaster.java      |  19 +
 .../runtime/jobmaster/JobMasterGateway.java     |  36 ++
 .../registration/RegisteredRpcConnection.java   |   2 +-
 .../resourcemanager/ResourceManager.java        |   2 +-
 .../slotmanager/SlotManager.java                |   8 +-
 .../runtime/taskexecutor/JobLeaderListener.java |  60 +++
 .../runtime/taskexecutor/JobLeaderService.java  | 390 ++++++++++++++
 .../taskexecutor/JobManagerConnection.java      |  23 +-
 .../runtime/taskexecutor/JobManagerTable.java   |  59 +++
 .../runtime/taskexecutor/TaskExecutor.java      | 522 ++++++++++++++-----
 .../taskexecutor/TaskExecutorGateway.java       |  25 +-
 ...TaskExecutorToResourceManagerConnection.java |   5 +
 .../runtime/taskexecutor/TaskManagerRunner.java |   2 +
 .../taskexecutor/TaskManagerServices.java       |  24 +-
 .../exceptions/SlotAllocationException.java     |  39 ++
 .../taskexecutor/slot/TaskSlotTable.java        |  39 +-
 .../apache/flink/runtime/taskmanager/Task.java  |   2 +-
 .../TaskDeploymentDescriptorTest.java           |   6 +-
 .../TestingHighAvailabilityServices.java        |   2 +-
 .../metrics/groups/TaskManagerGroupTest.java    |  10 +-
 .../slotmanager/SlotManagerTest.java            |   2 +-
 .../slotmanager/SlotProtocolTest.java           |   8 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  29 +-
 .../runtime/taskexecutor/TaskExecutorTest.java  | 431 ++++++++++++++-
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   3 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  75 +--
 .../flink/runtime/taskmanager/TaskStopTest.java |   2 +-
 .../flink/runtime/taskmanager/TaskTest.java     |  23 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |  38 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |  25 +-
 37 files changed, 1805 insertions(+), 283 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
index b851eba..2883570 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ReflectionUtil.java
@@ -23,6 +23,9 @@ import org.apache.flink.annotation.Internal;
 
 import java.lang.reflect.ParameterizedType;
 import java.lang.reflect.Type;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
 
 @Internal
 public final class ReflectionUtil {
@@ -151,6 +154,113 @@ public final class ReflectionUtil {
        }
 
        /**
+        * Extract the full template type information from the given type's 
template parameter at the
+        * given position.
+        *
+        * @param type type to extract the full template parameter information 
from
+        * @param templatePosition describing at which position the template 
type parameter is
+        * @return Full type information describing the template parameter's 
type
+        */
+       public static FullTypeInfo getFullTemplateType(Type type, int 
templatePosition) {
+               if (type instanceof ParameterizedType) {
+                       return getFullTemplateType(((ParameterizedType) 
type).getActualTypeArguments()[templatePosition]);
+               } else {
+                       throw new IllegalArgumentException();
+               }
+       }
+
+       /**
+        * Extract the full type information from the given type.
+        *
+        * @param type to be analyzed
+        * @return Full type information describing the given type
+        */
+       public static FullTypeInfo getFullTemplateType(Type type) {
+               if (type instanceof ParameterizedType) {
+                       ParameterizedType parameterizedType = 
(ParameterizedType) type;
+
+                       FullTypeInfo[] templateTypeInfos = new 
FullTypeInfo[parameterizedType.getActualTypeArguments().length];
+
+                       for (int i = 0; i < 
parameterizedType.getActualTypeArguments().length; i++) {
+                               templateTypeInfos[i] = 
getFullTemplateType(parameterizedType.getActualTypeArguments()[i]);
+                       }
+
+                       return new 
FullTypeInfo((Class<?>)parameterizedType.getRawType(), templateTypeInfos);
+               } else {
+                       return new FullTypeInfo((Class<?>) type, null);
+               }
+       }
+
+       /**
+        * Container for the full type information of a type. This means that 
it contains the
+        * {@link Class} object and for each template parameter it contains a 
full type information
+        * describing the type.
+        */
+       public static class FullTypeInfo {
+               private final Class<?> clazz;
+               private final FullTypeInfo[] templateTypeInfos;
+
+
+               public FullTypeInfo(Class<?> clazz, FullTypeInfo[] 
templateTypeInfos) {
+                       this.clazz = Preconditions.checkNotNull(clazz);
+                       this.templateTypeInfos = templateTypeInfos;
+               }
+
+               public Class<?> getClazz() {
+                       return clazz;
+               }
+
+               public FullTypeInfo[] getTemplateTypeInfos() {
+                       return templateTypeInfos;
+               }
+
+               public Iterator<Class<?>> getClazzIterator() {
+                       UnionIterator<Class<?>> unionIterator = new 
UnionIterator<>();
+
+                       
unionIterator.add(Collections.<Class<?>>singleton(clazz).iterator());
+
+                       if (templateTypeInfos != null) {
+                               for (int i = 0; i < templateTypeInfos.length; 
i++) {
+                                       
unionIterator.add(templateTypeInfos[i].getClazzIterator());
+                               }
+                       }
+
+                       return unionIterator;
+               }
+
+               @Override
+               public String toString() {
+                       StringBuilder builder = new StringBuilder();
+
+                       builder.append(clazz.getSimpleName());
+
+                       if (templateTypeInfos != null) {
+                               builder.append("<");
+
+                               for (int i = 0; i < templateTypeInfos.length - 
1; i++) {
+                                       
builder.append(templateTypeInfos[i]).append(", ");
+                               }
+
+                               
builder.append(templateTypeInfos[templateTypeInfos.length - 1]);
+                               builder.append(">");
+                       }
+
+                       return builder.toString();
+               }
+
+               @Override
+               public boolean equals(Object obj) {
+                       if (obj instanceof FullTypeInfo) {
+                               FullTypeInfo other = (FullTypeInfo) obj;
+
+                               return clazz == other.getClazz() && 
Arrays.equals(templateTypeInfos, other.getTemplateTypeInfos());
+                       } else {
+                               return false;
+                       }
+               }
+       }
+
+       /**
         * Private constructor to prevent instantiation.
         */
        private ReflectionUtil() {

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index b1ac665..884d632 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -59,7 +59,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
        private final ExecutionAttemptID executionId;
 
        /** The allocation ID of the slot in which the task shall be run */
-       private final AllocationID allocationID;
+       private final AllocationID allocationId;
 
        /** The task's name. */
        private final String taskName;
@@ -115,6 +115,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
         */
        public TaskDeploymentDescriptor(
                JobID jobID,
+               AllocationID allocationId,
                String jobName,
                JobVertexID vertexID,
                ExecutionAttemptID executionId,
@@ -142,6 +143,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                checkArgument(attemptNumber >= 0);
 
                this.jobID = checkNotNull(jobID);
+               this.allocationId = checkNotNull(allocationId);
                this.jobName = checkNotNull(jobName);
                this.vertexID = checkNotNull(vertexID);
                this.executionId = checkNotNull(executionId);
@@ -162,11 +164,11 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                this.operatorState = operatorState;
                this.keyGroupState = keyGroupState;
                this.partitionableOperatorState = 
partitionableOperatorStateHandles;
-               this.allocationID = new AllocationID();
        }
 
        public TaskDeploymentDescriptor(
                JobID jobID,
+               AllocationID allocationId,
                String jobName,
                JobVertexID vertexID,
                ExecutionAttemptID executionId,
@@ -187,6 +189,7 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
 
                this(
                        jobID,
+                       allocationId,
                        jobName,
                        vertexID,
                        executionId,
@@ -327,8 +330,8 @@ public final class TaskDeploymentDescriptor implements 
Serializable {
                return requiredClasspaths;
        }
 
-       public AllocationID getAllocationID() {
-               return allocationID;
+       public AllocationID getAllocationId() {
+               return allocationId;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 4837803..708a72c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.executiongraph;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -666,6 +667,7 @@ public class ExecutionVertex {
 
                return new TaskDeploymentDescriptor(
                        getJobId(),
+                       new AllocationID(), // TODO: Obtain the proper 
allocation id from the slot
                        getExecutionGraph().getJobName(),
                        getJobvertexId(),
                        executionId,

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 5d78ffc..484cddb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -49,10 +49,11 @@ public interface HighAvailabilityServices {
         * Gets the leader retriever for the job JobMaster which is responsible 
for the given job
         *
         * @param jobID The identifier of the job.
+        * @param defaultAddress address under which the job manager is 
reachable
         * @return
         * @throws Exception
         */
-       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) throws 
Exception;
+       LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, String 
defaultAddress) throws Exception;
 
        /**
         * Gets the leader election service for the cluster's resource manager.

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index d7fd2bf..1c73c01 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -82,8 +82,8 @@ public class NonHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
-               return new 
StandaloneLeaderRetrievalService(jobMastersAddress.get(jobID), new UUID(0, 0));
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultAddress) throws Exception {
+               return new StandaloneLeaderRetrievalService(defaultAddress, new 
UUID(0, 0));
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
index 3a7736b..bbe8ecb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/ZookeeperHaServices.java
@@ -111,7 +111,7 @@ public class ZookeeperHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultAddress) throws Exception {
                return ZooKeeperUtils.createLeaderRetrievalService(client, 
configuration, getPathForJobManager(jobID));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
new file mode 100644
index 0000000..7272cd4
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JMTMRegistrationSuccess.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.util.Preconditions;
+
+public class JMTMRegistrationSuccess extends RegistrationResponse.Success {
+       private static final long serialVersionUID = -3528383155961318929L;
+
+       private final ResourceID resourceID;
+       private final int blobPort;
+
+       public JMTMRegistrationSuccess(ResourceID resourceID, int blobPort) {
+               Preconditions.checkArgument(0 < blobPort && 65536 > blobPort, 
"The blob port has to be 0 < blobPort < 65536.");
+
+               this.resourceID = Preconditions.checkNotNull(resourceID);
+               this.blobPort = blobPort;
+       }
+
+       public ResourceID getResourceID() {
+               return resourceID;
+       }
+
+       public int getBlobPort() {
+               return blobPort;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index e11f3a1..a7be476 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.client.JobExecutionException;
 import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.client.SerializedJobExecutionResult;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
@@ -646,6 +647,24 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                executionGraph.getRequiredClasspaths());
        }
 
+       @RpcMethod
+       public Iterable<AllocationID> offerSlots(final Iterable<AllocationID> 
slots, UUID leaderId) {
+               throw new UnsupportedOperationException("Has to be 
implemented.");
+       }
+
+       @RpcMethod
+       public void failSlot(final AllocationID allocationId, UUID leaderId, 
Exception cause) {
+               throw new UnsupportedOperationException("Has to be 
implemented.");
+       }
+
+       @RpcMethod
+       public RegistrationResponse registerTaskManager(
+               final String taskManagerAddress,
+               final ResourceID taskManagerProcessId,
+               final UUID leaderId) {
+               throw new UnsupportedOperationException("Has to be 
implemented.");
+       }
+
        
//----------------------------------------------------------------------------------------------
        // Internal methods
        
//----------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
index b27b41c..0f155a4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.jobmaster;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorGateway;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -35,6 +36,7 @@ import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.query.KvStateID;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateServerAddress;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KvState;
@@ -170,4 +172,38 @@ public interface JobMasterGateway extends 
CheckpointCoordinatorGateway {
         * Request the classloading props of this job.
         */
        Future<ClassloadingProps> requestClassloadingProps();
+
+       /**
+        * Offer the given slots to the job manager. The response contains the 
set of accepted slots.
+        *
+        * @param slots to offer to the job manager
+        * @param leaderId identifying the job leader
+        * @param timeout for the rpc call
+        * @return Future set of accepted slots.
+        */
+       Future<Iterable<AllocationID>> offerSlots(final Iterable<AllocationID> 
slots, UUID leaderId, @RpcTimeout final Time timeout);
+
+       /**
+        * Fail the slot with the given allocation id and cause.
+        *
+        * @param allocationId identifying the slot to fail
+        * @param leaderId identifying the job leader
+        * @param cause of the failing
+        */
+       void failSlot(final AllocationID allocationId, UUID leaderId, Exception 
cause);
+
+       /**
+        * Register the task manager at the job manager.
+        *
+        * @param taskManagerAddress address of the task manager
+        * @param taskManagerProcessId identifying the task manager
+        * @param leaderId identifying the job leader
+        * @param timeout for the rpc call
+        * @return Future registration response indicating whether the 
registration was successful or not
+        */
+       Future<RegistrationResponse> registerTaskManager(
+               final String taskManagerAddress,
+               final ResourceID taskManagerProcessId,
+               final UUID leaderId,
+               @RpcTimeout final Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index 76093b0..78d4dbc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -35,7 +35,7 @@ import static org.apache.flink.util.Preconditions.checkState;
 /**
  * This utility class implements the basis of RPC connecting from one 
component to another component,
  * for example the RPC connection from TaskExecutor to ResourceManager.
- * This {@code RegisteredRpcConnection} implements registration and get target 
gateway .
+ * This {@code RegisteredRpcConnection} implements registration and get target 
gateway.
  *
  * <p>The registration gives access to a future that is completed upon 
successful registration.
  * The RPC connection can be closed, for example when the target where it 
tries to register

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 3122804..6f6d525 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -188,7 +188,7 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                } else {
                        try {
                                LeaderRetrievalService jobMasterLeaderRetriever 
=
-                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID);
+                                       
highAvailabilityServices.getJobManagerLeaderRetriever(jobID, jobMasterAddress);
                                jobIdLeaderListener = new 
JobIdLeaderListener(jobID, jobMasterLeaderRetriever);
                        } catch (Exception e) {
                                log.warn("Failed to start 
JobMasterLeaderRetriever for JobID {}", jobID);

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index e312ea2..f055971 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -298,7 +298,13 @@ public abstract class SlotManager {
                final TaskExecutorRegistration registration = 
freeSlot.getTaskExecutorRegistration();
                final Future<TMSlotRequestReply> slotRequestReplyFuture =
                        registration.getTaskExecutorGateway()
-                               .requestSlot(freeSlot.getSlotId(), 
allocationID, rmServices.getLeaderID(), timeout);
+                               .requestSlot(
+                                       freeSlot.getSlotId(),
+                                       slotRequest.getJobId(),
+                                       allocationID,
+                                       "foobar", // TODO: set proper JM address
+                                       rmServices.getLeaderID(),
+                                       timeout);
 
                slotRequestReplyFuture.handleAsync(new 
BiFunction<TMSlotRequestReply, Throwable, Void>() {
                        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
new file mode 100644
index 0000000..f02a8c2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderListener.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+
+import java.util.UUID;
+
+/**
+ * Listener for the {@link JobLeaderService}. The listener is notified 
whenever a job manager
+ * gained leadership for a registered job and the service could establish a 
connection to it.
+ * Furthermore, the listener is notified when a job manager loses leadership 
for a job. In case
+ * of an error, the {@link #handleError(Throwable)}} is called.
+ */
+public interface JobLeaderListener {
+
+       /**
+        * Callback if a job manager has gained leadership for the job 
identified by the job id and a
+        * connection could be established to this job manager.
+        *
+        * @param jobId identifying the job for which the job manager has 
gained leadership
+        * @param jobManagerGateway to the job leader
+        * @param jobLeaderId new leader id of the job leader
+        * @param registrationMessage containing further registration 
information
+        */
+       void jobManagerGainedLeadership(JobID jobId, JobMasterGateway 
jobManagerGateway, UUID jobLeaderId, JMTMRegistrationSuccess 
registrationMessage);
+
+       /**
+        * Callback if the job leader for the job with the given job id lost 
its leadership.
+        *
+        * @param jobId identifying the job whose leader has lost leadership
+        * @param jobLeaderId old leader id
+        */
+       void jobManagerLostLeadership(JobID jobId, UUID jobLeaderId);
+
+       /**
+        * Callback for errors which might occur in the {@link 
JobLeaderService}.
+        *
+        * @param throwable cause
+        */
+       void handleError(Throwable throwable);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
new file mode 100644
index 0000000..9e71349
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java
@@ -0,0 +1,390 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+/**
+ * This service has the responsibility to monitor the job leaders (the job 
manager which is leader
+ * for a given job) for all registered jobs. Upon gaining leadership for a job 
and detection by the
+ * job leader service, the service tries to establish a connection to the job 
leader. After
+ * successfully establishing a connection, the job leader listener is notified 
about the new job
+ * leader and its connection. In case that a job leader loses leadership, the 
job leader listener
+ * is notified as well.
+ */
+public class JobLeaderService {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(JobLeaderService.class);
+
+       /** Process id of the owning process */
+       private final ResourceID ownerProcessId;
+
+       /** The leader retrieval service and listener for each registered job */
+       private final Map<JobID, Tuple2<LeaderRetrievalService, 
JobLeaderService.JobManagerLeaderListener>> jobLeaderServices;
+
+       /** Internal state of the service */
+       private volatile JobLeaderService.State state;
+
+       /** Address of the owner of this service. This address is used for the 
job manager connection */
+       private String ownerAddress;
+
+       /** Rpc service to use for establishing connections */
+       private RpcService rpcService;
+
+       /** High availability services to create the leader retrieval services 
from */
+       private HighAvailabilityServices highAvailabilityServices;
+
+       /** Job leader listener listening for job leader changes */
+       private JobLeaderListener jobLeaderListener;
+
+       public JobLeaderService(ResourceID ownerProcessId) {
+               this.ownerProcessId = 
Preconditions.checkNotNull(ownerProcessId);
+
+               jobLeaderServices = new HashMap<>(4);
+
+               state = JobLeaderService.State.CREATED;
+
+               ownerAddress = null;
+               rpcService = null;
+               highAvailabilityServices = null;
+               jobLeaderListener = null;
+       }
+
+       // 
-------------------------------------------------------------------------------
+       // Methods
+       // 
-------------------------------------------------------------------------------
+
+       /**
+        * Start the job leader service with the given services.
+        *
+        * @param initialOwnerAddress to be used for establishing connections 
(source address)
+        * @param initialRpcService to be used to create rpc connections
+        * @param initialHighAvailabilityServices to create leader retrieval 
services for the different jobs
+        * @param initialJobLeaderListener listening for job leader changes
+        */
+       public void start(
+               final String initialOwnerAddress,
+               final RpcService initialRpcService,
+               final HighAvailabilityServices initialHighAvailabilityServices,
+               final JobLeaderListener initialJobLeaderListener) {
+
+               if (JobLeaderService.State.CREATED != state) {
+                       throw new IllegalStateException("The service has 
already been started.");
+               } else {
+                       LOG.info("Start job leader service.");
+
+                       this.ownerAddress = 
Preconditions.checkNotNull(initialOwnerAddress);
+                       this.rpcService = 
Preconditions.checkNotNull(initialRpcService);
+                       this.highAvailabilityServices = 
Preconditions.checkNotNull(initialHighAvailabilityServices);
+                       this.jobLeaderListener = 
Preconditions.checkNotNull(initialJobLeaderListener);
+                       state = JobLeaderService.State.STARTED;
+               }
+       }
+
+       /**
+        * Stop the job leader services. This implies stopping all leader 
retrieval services for the
+        * different jobs and their leader retrieval listeners.
+        *
+        * @throws Exception if an error occurs while stopping the service
+        */
+       public void stop() throws Exception {
+               LOG.info("Stop job leader service.");
+
+               if (JobLeaderService.State.STARTED == state) {
+
+                       for (Tuple2<LeaderRetrievalService, 
JobLeaderService.JobManagerLeaderListener> leaderRetrievalServiceEntry: 
jobLeaderServices.values()) {
+                               LeaderRetrievalService leaderRetrievalService = 
leaderRetrievalServiceEntry.f0;
+                               JobLeaderService.JobManagerLeaderListener 
jobManagerLeaderListener = leaderRetrievalServiceEntry.f1;
+
+                               jobManagerLeaderListener.stop();
+                               leaderRetrievalService.stop();
+                       }
+
+                       jobLeaderServices.clear();
+               }
+
+               state = JobLeaderService.State.STOPPED;
+       }
+
+       /**
+        * Check whether the service monitors the given job.
+        *
+        * @param jobId identifying the job
+        * @return True if the given job is monitored; otherwise false
+        */
+       public boolean containsJob(JobID jobId) {
+               Preconditions.checkState(JobLeaderService.State.STARTED == 
state, "The service is currently not running.");
+
+               return jobLeaderServices.containsKey(jobId);
+       }
+
+       /**
+        * Remove the given job from being monitored by the job leader service.
+        *
+        * @param jobId identifying the job to remove from monitoring
+        * @throws Exception if an error occurred while stopping the leader 
retrieval service and listener
+        */
+       public void removeJob(JobID jobId) throws Exception {
+               Preconditions.checkState(JobLeaderService.State.STARTED == 
state, "The service is currently not running.");
+
+               Tuple2<LeaderRetrievalService, 
JobLeaderService.JobManagerLeaderListener> entry = 
jobLeaderServices.remove(jobId);
+
+               if (entry != null) {
+                       LOG.info("Remove job {} from job leader monitoring.", 
jobId);
+
+                       LeaderRetrievalService leaderRetrievalService = 
entry.f0;
+                       JobLeaderService.JobManagerLeaderListener 
jobManagerLeaderListener = entry.f1;
+
+                       leaderRetrievalService.stop();
+                       jobManagerLeaderListener.stop();
+               }
+       }
+
+       /**
+        * Add the given job to be monitored. This means that the service tries 
to detect leaders for
+        * this job and then tries to establish a connection to it.
+        *
+        * @param jobId identifying the job to monitor
+        * @param defaultTargetAddress of the job leader
+        * @throws Exception if an error occurs while starting the leader 
retrieval service
+        */
+       public void addJob(final JobID jobId, final String 
defaultTargetAddress) throws Exception {
+               Preconditions.checkState(JobLeaderService.State.STARTED == 
state, "The service is currently not running.");
+
+               LOG.info("Add job {} for job leader monitoring.", jobId);
+
+               final LeaderRetrievalService leaderRetrievalService = 
highAvailabilityServices.getJobManagerLeaderRetriever(
+                       jobId,
+                       defaultTargetAddress);
+
+               JobLeaderService.JobManagerLeaderListener 
jobManagerLeaderListener = new JobManagerLeaderListener(jobId);
+
+               leaderRetrievalService.start(jobManagerLeaderListener);
+
+               jobLeaderServices.put(jobId, Tuple2.of(leaderRetrievalService, 
jobManagerLeaderListener));
+       }
+
+       /**
+        * Leader listener which tries to establish a connection to a newly 
detected job leader.
+        */
+       private final class JobManagerLeaderListener implements 
LeaderRetrievalListener {
+
+               /** Job id identifying the job to look for a leader */
+               private final JobID jobId;
+
+               /** Rpc connection to the job leader */
+               private RegisteredRpcConnection<JobMasterGateway, 
JMTMRegistrationSuccess> rpcConnection;
+
+               /** State of the listener */
+               private volatile boolean stopped;
+
+               /** Leader id of the current job leader */
+               private volatile UUID currentLeaderId;
+
+               private JobManagerLeaderListener(JobID jobId) {
+                       this.jobId = Preconditions.checkNotNull(jobId);
+
+                       stopped = false;
+                       rpcConnection = null;
+                       currentLeaderId = null;
+               }
+
+               public void stop() {
+                       stopped = true;
+
+                       if (rpcConnection != null) {
+                               rpcConnection.close();
+                       }
+               }
+
+               @Override
+               public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderId) {
+                       if (stopped) {
+                               LOG.debug("{}'s leader retrieval listener 
reported a new leader for job {}. " +
+                                       "However, the service is no longer 
running.", JobLeaderService.class.getSimpleName(), jobId);
+                       } else {
+                               LOG.debug("New leader information for job {}. 
Address: {}, leader id: {}.",
+                                       jobId, leaderAddress, leaderId);
+
+                               if (leaderAddress == null || 
leaderAddress.isEmpty()) {
+                                       // the leader lost leadership but there 
is no other leader yet.
+                                       if (rpcConnection != null) {
+                                               rpcConnection.close();
+                                       }
+
+                                       
jobLeaderListener.jobManagerLostLeadership(jobId, currentLeaderId);
+
+                                       currentLeaderId = leaderId;
+                               } else {
+                                       currentLeaderId = leaderId;
+
+                                       if (rpcConnection != null) {
+                                               // check if we are already 
trying to connect to this leader
+                                               if 
(!leaderId.equals(rpcConnection.getTargetLeaderId())) {
+                                                       rpcConnection.close();
+
+                                                       rpcConnection = new 
JobManagerRegisteredRpcConnection(
+                                                               LOG,
+                                                               leaderAddress,
+                                                               leaderId,
+                                                               
rpcService.getExecutor());
+                                               }
+                                       } else {
+                                               rpcConnection = new 
JobManagerRegisteredRpcConnection(
+                                                       LOG,
+                                                       leaderAddress,
+                                                       leaderId,
+                                                       
rpcService.getExecutor());
+                                       }
+
+                                       // double check for a concurrent stop 
operation
+                                       if (stopped) {
+                                               rpcConnection.close();
+                                       } else {
+                                               LOG.info("Try to register at 
job manager {} with leader id {}.", leaderAddress, leaderId);
+                                               rpcConnection.start();
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public void handleError(Exception exception) {
+                       if (stopped) {
+                               LOG.debug("{}'s leader retrieval listener 
reported an exception for job {}. " +
+                                               "However, the service is no 
longer running.", JobLeaderService.class.getSimpleName(),
+                                       jobId, exception);
+                       } else {
+                               jobLeaderListener.handleError(exception);
+                       }
+               }
+
+               /**
+                * Rpc connection for the job manager <--> task manager 
connection.
+                */
+               private final class JobManagerRegisteredRpcConnection extends 
RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> {
+
+                       JobManagerRegisteredRpcConnection(
+                               Logger log,
+                               String targetAddress,
+                               UUID targetLeaderId,
+                               Executor executor) {
+                               super(log, targetAddress, targetLeaderId, 
executor);
+                       }
+
+                       @Override
+                       protected RetryingRegistration<JobMasterGateway, 
JMTMRegistrationSuccess> generateRegistration() {
+                               return new 
JobLeaderService.JobManagerRetryingRegistration(
+                                       LOG,
+                                       rpcService,
+                                       "JobManager",
+                                       JobMasterGateway.class,
+                                       getTargetAddress(),
+                                       getTargetLeaderId(),
+                                       ownerAddress,
+                                       ownerProcessId);
+                       }
+
+                       @Override
+                       protected void 
onRegistrationSuccess(JMTMRegistrationSuccess success) {
+                               // filter out old registration attempts
+                               if 
(getTargetLeaderId().equals(currentLeaderId)) {
+                                       log.info("Successful registration at 
job manager {} for job {}.", getTargetAddress(), jobId);
+
+                                       
jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), 
getTargetLeaderId(), success);
+                               } else {
+                                       log.debug("Encountered obsolete 
JobManager registration success from {} with leader session ID {}.", 
getTargetAddress(), getTargetLeaderId());
+                               }
+                       }
+
+                       @Override
+                       protected void onRegistrationFailure(Throwable failure) 
{
+                               // filter out old registration attempts
+                               if 
(getTargetLeaderId().equals(currentLeaderId)) {
+                                       log.info("Failed to register at job 
manager {} for job {}.", getTargetAddress(), jobId);
+                                       jobLeaderListener.handleError(failure);
+                               } else {
+                                       log.debug("Obsolete JobManager 
registration failure from {} with leader session ID {}.", getTargetAddress(), 
getTargetLeaderId(), failure);
+                               }
+                       }
+               }
+       }
+
+       /**
+        * Retrying registration for the job manager <--> task manager 
connection.
+        */
+       private static final class JobManagerRetryingRegistration extends 
RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> {
+
+               private final String taskManagerAddress;
+               private final ResourceID taskManagerProcessId;
+
+               JobManagerRetryingRegistration(
+                       Logger log,
+                       RpcService rpcService,
+                       String targetName,
+                       Class<JobMasterGateway> targetType,
+                       String targetAddress,
+                       UUID leaderId,
+                       String taskManagerAddress,
+                       ResourceID taskManagerProcessId) {
+
+                       super(log, rpcService, targetName, targetType, 
targetAddress, leaderId);
+
+                       this.taskManagerAddress = 
Preconditions.checkNotNull(taskManagerAddress);
+                       this.taskManagerProcessId = 
Preconditions.checkNotNull(taskManagerProcessId);
+               }
+
+               @Override
+               protected Future<RegistrationResponse> 
invokeRegistration(JobMasterGateway gateway, UUID leaderId, long timeoutMillis) 
throws Exception {
+                       return gateway.registerTaskManager(
+                               taskManagerAddress,
+                               taskManagerProcessId,
+                               leaderId,
+                               Time.milliseconds(timeoutMillis));
+               }
+       }
+
+       /**
+        * Internal state of the service
+        */
+       private enum State {
+               CREATED, STARTED, STOPPED
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
index 6fcd082..8d2057a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerConnection.java
@@ -34,7 +34,7 @@ import java.util.UUID;
 public class JobManagerConnection {
 
        // Job master leader session id
-       private final UUID jobMasterLeaderId;
+       private final UUID leaderId;
 
        // Gateway to the job master
        private final JobMasterGateway jobMasterGateway;
@@ -55,15 +55,14 @@ public class JobManagerConnection {
        private final PartitionStateChecker partitionStateChecker;
 
        public JobManagerConnection(
-                       UUID jobMasterLeaderId,
-                       JobMasterGateway jobMasterGateway,
-                       TaskManagerActions taskManagerActions,
-                       CheckpointResponder checkpointResponder,
-                       LibraryCacheManager libraryCacheManager,
-                       ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
-                       PartitionStateChecker partitionStateChecker)
-       {
-               this.jobMasterLeaderId = 
Preconditions.checkNotNull(jobMasterLeaderId);
+               JobMasterGateway jobMasterGateway,
+               UUID leaderId,
+               TaskManagerActions taskManagerActions,
+               CheckpointResponder checkpointResponder,
+               LibraryCacheManager libraryCacheManager,
+               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
+               PartitionStateChecker partitionStateChecker) {
+               this.leaderId = Preconditions.checkNotNull(leaderId);
                this.jobMasterGateway = 
Preconditions.checkNotNull(jobMasterGateway);
                this.taskManagerActions = 
Preconditions.checkNotNull(taskManagerActions);
                this.checkpointResponder = 
Preconditions.checkNotNull(checkpointResponder);
@@ -72,8 +71,8 @@ public class JobManagerConnection {
                this.partitionStateChecker = 
Preconditions.checkNotNull(partitionStateChecker);
        }
 
-       public UUID getJobMasterLeaderId() {
-               return jobMasterLeaderId;
+       public UUID getLeaderId() {
+               return leaderId;
        }
 
        public JobMasterGateway getJobManagerGateway() {

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
new file mode 100644
index 0000000..00c467e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobManagerTable.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor;
+
+import org.apache.flink.api.common.JobID;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link JobManagerConnection} registered under their 
respective job id.
+ */
+public class JobManagerTable {
+       private final Map<JobID, JobManagerConnection> jobManagerConnections;
+
+       public JobManagerTable() {
+               jobManagerConnections = new HashMap<>(4);
+       }
+
+       public boolean contains(JobID jobId) {
+               return jobManagerConnections.containsKey(jobId);
+       }
+
+       public boolean put(JobID jobId, JobManagerConnection 
jobManagerConnection) {
+               JobManagerConnection previousJMC = 
jobManagerConnections.put(jobId, jobManagerConnection);
+
+               if (previousJMC != null) {
+                       jobManagerConnections.put(jobId, previousJMC);
+
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       public JobManagerConnection remove(JobID jobId) {
+               return jobManagerConnections.remove(jobId);
+       }
+
+       public JobManagerConnection get(JobID jobId) {
+               return jobManagerConnections.get(jobId);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index e642315..3e3a544 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobCache;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -34,7 +36,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
 import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
@@ -42,6 +43,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
@@ -51,7 +53,6 @@ import 
org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
@@ -59,6 +60,7 @@ import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.taskexecutor.exceptions.CheckpointException;
 import org.apache.flink.runtime.taskexecutor.exceptions.PartitionException;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskexecutor.exceptions.TaskException;
 import 
org.apache.flink.runtime.taskexecutor.exceptions.TaskSubmissionException;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
@@ -81,9 +83,10 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -97,6 +100,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /** The connection information of this task manager */
        private final TaskManagerLocation taskManagerLocation;
 
+       /** Max blob port which is accepted */
+       public static final int MAX_BLOB_PORT = 65536;
+
        /** The access to the leader election and retrieval services */
        private final HighAvailabilityServices haServices;
 
@@ -121,10 +127,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        private final TaskManagerMetricGroup taskManagerMetricGroup;
 
        private final BroadcastVariableManager broadcastVariableManager;
-       
-       /** Slots which have become available but haven't been confirmed by the 
RM */
-       private final Set<SlotID> unconfirmedFreeSlots;
-
 
        private final FileCache fileCache;
 
@@ -140,6 +142,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        private final TaskSlotTable taskSlotTable;
 
+       private final JobManagerTable jobManagerTable;
+
+       private final JobLeaderService jobLeaderService;
+
        // 
------------------------------------------------------------------------
 
        public TaskExecutor(
@@ -155,6 +161,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                BroadcastVariableManager broadcastVariableManager,
                FileCache fileCache,
                TaskSlotTable taskSlotTable,
+               JobManagerTable jobManagerTable,
+               JobLeaderService jobLeaderService,
                FatalErrorHandler fatalErrorHandler) {
 
                super(rpcService);
@@ -173,10 +181,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
                this.broadcastVariableManager = 
checkNotNull(broadcastVariableManager);
                this.fileCache = checkNotNull(fileCache);
+               this.jobManagerTable = checkNotNull(jobManagerTable);
+               this.jobLeaderService = checkNotNull(jobLeaderService);
 
                this.jobManagerConnections = new HashMap<>(4);
-
-               this.unconfirmedFreeSlots = new HashSet<>();
        }
 
        // 
------------------------------------------------------------------------
@@ -195,7 +203,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
 
                // tell the task slot table who's responsible for the task slot 
actions
-               taskSlotTable.start(new SlotActionsImpl(), 
taskManagerConfiguration.getTimeout());
+               taskSlotTable.start(new SlotActionsImpl());
+
+               // start the job leader service
+               jobLeaderService.start(getAddress(), getRpcService(), 
haServices, new JobLeaderListenerImpl());
        }
 
        /**
@@ -207,7 +218,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                taskSlotTable.stop();
 
-               if (resourceManagerConnection.isConnected()) {
+               if (isConnectedToResourceManager()) {
                        try {
                                resourceManagerConnection.close();
                        } catch (Exception e) {
@@ -248,30 +259,39 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                log.info("Stopped TaskManager {}.", getAddress());
        }
 
-       // 
========================================================================
+       // 
======================================================================
        //  RPC methods
-       // 
========================================================================
+       // 
======================================================================
 
        // 
----------------------------------------------------------------------
        // Task lifecycle RPCs
        // 
----------------------------------------------------------------------
 
        @RpcMethod
-       public Acknowledge submitTask(TaskDeploymentDescriptor tdd, ResourceID 
jobManagerID) throws TaskSubmissionException {
+       public Acknowledge submitTask(TaskDeploymentDescriptor tdd, UUID 
jobManagerLeaderId) throws TaskSubmissionException {
 
-               JobManagerConnection jobManagerConnection = 
getJobManagerConnection(jobManagerID);
+               JobManagerConnection jobManagerConnection = 
jobManagerTable.get(tdd.getJobID());
 
                if (jobManagerConnection == null) {
-                       final String message = "Could not submit task because 
JobManager " + jobManagerID +
-                               " was not associated.";
+                       final String message = "Could not submit task because 
there is no JobManager " +
+                               "associated for the job " + tdd.getJobID() + 
'.';
+
+                       log.debug(message);
+                       throw new TaskSubmissionException(message);
+               }
+
+               if 
(!jobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+                       final String message = "Rejecting the task submission 
because the job manager leader id " +
+                               jobManagerLeaderId + " does not match the 
expected job manager leader id " +
+                               jobManagerConnection.getLeaderId() + '.';
 
                        log.debug(message);
                        throw new TaskSubmissionException(message);
                }
 
-               if (!taskSlotTable.existActiveSlot(tdd.getJobID(), 
tdd.getAllocationID())) {
+               if (!taskSlotTable.existsActiveSlot(tdd.getJobID(), 
tdd.getAllocationId())) {
                        final String message = "No task slot allocated for job 
ID " + tdd.getJobID() +
-                               " and allocation ID " + tdd.getAllocationID() + 
'.';
+                               " and allocation ID " + tdd.getAllocationId() + 
'.';
                        log.debug(message);
                        throw new TaskSubmissionException(message);
                }
@@ -279,7 +299,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                TaskMetricGroup taskMetricGroup = 
taskManagerMetricGroup.addTaskForJob(tdd);
 
                InputSplitProvider inputSplitProvider = new 
RpcInputSplitProvider(
-                               jobManagerConnection.getJobMasterLeaderId(),
+                               jobManagerConnection.getLeaderId(),
                                jobManagerConnection.getJobManagerGateway(),
                                tdd.getJobID(),
                                tdd.getVertexID(),
@@ -375,7 +395,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        // 
----------------------------------------------------------------------
 
        @RpcMethod
-       public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos) throws 
PartitionException {
+       public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Iterable<PartitionInfo> partitionInfos) throws 
PartitionException {
                final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
@@ -471,38 +491,319 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       // 
----------------------------------------------------------------------
+       // Slot allocation RPCs
+       // 
----------------------------------------------------------------------
+
        /**
+        * /**
         * Requests a slot from the TaskManager
         *
-        * @param slotID Slot id for the request
-        * @param allocationID id for the request
-        * @param resourceManagerLeaderID current leader id of the 
ResourceManager
+        * @param slotId identifying the requested slot
+        * @param jobId identifying the job for which the request is issued
+        * @param allocationId id for the request
+        * @param targetAddress of the job manager requesting the slot
+        * @param rmLeaderId current leader id of the ResourceManager
+        * @throws SlotAllocationException if the slot allocation fails
         * @return answer to the slot request
         */
        @RpcMethod
-       public TMSlotRequestReply requestSlot(SlotID slotID, AllocationID 
allocationID, UUID resourceManagerLeaderID) {
-               if 
(!resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderID))
 {
-                       return new TMSlotRequestRejected(
-                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
+       public TMSlotRequestReply requestSlot(
+               final SlotID slotId,
+               final JobID jobId,
+               final AllocationID allocationId,
+               final String targetAddress,
+               final UUID rmLeaderId) throws SlotAllocationException {
+               log.info("Receive slot request {} for job {} from resource 
manager with leader id {}.",
+                       allocationId, jobId, rmLeaderId);
+
+               if (resourceManagerConnection == null) {
+                       final String message = "TaskManager is not connected to 
a resource manager.";
+                       log.debug(message);
+                       throw new SlotAllocationException(message);
                }
-               if (unconfirmedFreeSlots.contains(slotID)) {
-                       // check if request has not been blacklisted because 
the notification of a free slot
-                       // has not been confirmed by the ResourceManager
-                       return new TMSlotRequestRejected(
-                               resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationID);
+
+               if 
(!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) {
+                       final String message = "The leader id " + rmLeaderId +
+                               " does not match with the leader id of the 
connected resource manager " +
+                               resourceManagerConnection.getTargetLeaderId() + 
'.';
+
+                       log.debug(message);
+                       throw new SlotAllocationException(message);
+               }
+
+               if (taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+                       if (taskSlotTable.allocateSlot(slotId.getSlotNumber(), 
jobId, allocationId, taskManagerConfiguration.getTimeout())) {
+                               log.info("Allocated slot for {}.", 
allocationId);
+                       } else {
+                               log.info("Could not allocate slot for {}.", 
allocationId);
+                               throw new SlotAllocationException("Could not 
allocate slot.");
+                       }
+               } else if (!taskSlotTable.isAllocated(slotId.getSlotNumber(), 
jobId, allocationId)) {
+                       final String message = "The slot " + slotId + " has 
already been allocated for a different job.";
+
+                       log.info(message);
+                       throw new SlotAllocationException(message);
+               }
+
+               if (jobManagerTable.contains(jobId)) {
+                       offerSlotsToJobManager(jobId);
+               } else {
+                       try {
+                               jobLeaderService.addJob(jobId, targetAddress);
+                       } catch (Exception e) {
+                               // free the allocated slot
+                               try {
+                                       taskSlotTable.freeSlot(allocationId);
+                               } catch (SlotNotFoundException 
slotNotFoundException) {
+                                       // slot no longer existent, this should 
actually never happen, because we've
+                                       // just allocated the slot. So let's 
fail hard in this case!
+                                       onFatalError(slotNotFoundException);
+                               }
+
+                               // sanity check
+                               if 
(!taskSlotTable.isSlotFree(slotId.getSlotNumber())) {
+                                       onFatalError(new Exception("Could not 
free slot " + slotId));
+                               }
+
+                               throw new SlotAllocationException("Could not 
add job to job leader service.", e);
+                       }
                }
-               return new TMSlotRequestRegistered(new InstanceID(), 
ResourceID.generate(), allocationID);
 
+               return new 
TMSlotRequestRegistered(resourceManagerConnection.getRegistrationId(), 
getResourceID(), allocationId);
        }
 
-       // 
------------------------------------------------------------------------
+       // 
======================================================================
        //  Internal methods
+       // 
======================================================================
+
+       // 
------------------------------------------------------------------------
+       //  Internal resource manager connection methods
+       // 
------------------------------------------------------------------------
+
+       private void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
+               if (resourceManagerConnection != null) {
+                       if (newLeaderAddress != null) {
+                               // the resource manager switched to a new leader
+                               log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
+                                       
resourceManagerConnection.getTargetAddress(), newLeaderAddress);
+                       }
+                       else {
+                               // address null means that the current leader 
is lost without a new leader being there, yet
+                               log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
+                                       
resourceManagerConnection.getTargetAddress());
+                       }
+
+                       // drop the current connection or connection attempt
+                       if (resourceManagerConnection != null) {
+                               resourceManagerConnection.close();
+                               resourceManagerConnection = null;
+                       }
+               }
+
+               // establish a connection to the new leader
+               if (newLeaderAddress != null) {
+                       log.info("Attempting to register at ResourceManager 
{}", newLeaderAddress);
+                       resourceManagerConnection =
+                               new TaskExecutorToResourceManagerConnection(
+                                       log,
+                                       this,
+                                       newLeaderAddress,
+                                       newLeaderId,
+                                       getMainThreadExecutor());
+                       resourceManagerConnection.start();
+               }
+       }
+
        // 
------------------------------------------------------------------------
+       //  Internal job manager connection methods
+       // 
------------------------------------------------------------------------
+
+       private void offerSlotsToJobManager(final JobID jobId) {
+               final JobManagerConnection jobManagerConnection = 
jobManagerTable.get(jobId);
+
+               if (jobManagerConnection == null) {
+                       log.debug("There is no job manager connection to the 
leader of job {}.", jobId);
+               } else {
+                       if (taskSlotTable.hasAllocatedSlots(jobId)) {
+                               log.info("Offer reserved slots to the leader of 
job {}.", jobId);
+
+                               final JobMasterGateway jobMasterGateway = 
jobManagerConnection.getJobManagerGateway();
+
+                               final Iterator<AllocationID> 
reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId);
+                               final UUID leaderId = 
jobManagerConnection.getLeaderId();
+
+                               final Collection<AllocationID> reservedSlots = 
new HashSet<>(2);
+
+                               while (reservedSlotsIterator.hasNext()) {
+                                       
reservedSlots.add(reservedSlotsIterator.next());
+                               }
+
+                               Future<Iterable<AllocationID>> 
acceptedSlotsFuture = jobMasterGateway.offerSlots(
+                                       reservedSlots,
+                                       leaderId,
+                                       taskManagerConfiguration.getTimeout());
+
+                               acceptedSlotsFuture.thenAcceptAsync(new 
AcceptFunction<Iterable<AllocationID>>() {
+                                       @Override
+                                       public void 
accept(Iterable<AllocationID> acceptedSlots) {
+                                               // check if the response is 
still valid
+                                               if 
(isJobManagerConnectionValid(jobId, leaderId)) {
+                                                       // mark accepted slots 
active
+                                                       for (AllocationID 
acceptedSlot: acceptedSlots) {
+                                                               try {
+                                                                       if 
(!taskSlotTable.markSlotActive(acceptedSlot)) {
+                                                                               
// the slot is either free or releasing at the moment
+                                                                               
final String message = "Could not mark slot " + jobId + " active.";
+                                                                               
log.debug(message);
+                                                                               
jobMasterGateway.failSlot(acceptedSlot, leaderId, new Exception(message));
+                                                                       }
+
+                                                                       // 
remove the assigned slots so that we can free the left overs
+                                                                       
reservedSlots.remove(acceptedSlot);
+                                                               } catch 
(SlotNotFoundException e) {
+                                                                       
log.debug("Could not mark slot {} active.", acceptedSlot,  e);
+                                                                       
jobMasterGateway.failSlot(acceptedSlot, leaderId, e);
+                                                               }
+                                                       }
+
+                                                       final Exception e = new 
Exception("The slot was rejected by the JobManager.");
+
+                                                       for (AllocationID 
rejectedSlot: reservedSlots) {
+                                                               
freeSlot(rejectedSlot, e);
+                                                       }
+                                               } else {
+                                                       // discard the response 
since there is a new leader for the job
+                                                       log.debug("Discard 
offer slot response since there is a new leader " +
+                                                               "for the job 
{}.", jobId);
+                                               }
+                                       }
+                               }, getMainThreadExecutor());
+
+                               acceptedSlotsFuture.exceptionallyAsync(new 
ApplyFunction<Throwable, Void>() {
+                                       @Override
+                                       public Void apply(Throwable throwable) {
+                                               if (throwable instanceof 
TimeoutException) {
+                                                       // We ran into a 
timeout. Try again.
+                                                       
offerSlotsToJobManager(jobId);
+                                               } else {
+                                                       // We encountered an 
exception. Free the slots and return them to the RM.
+                                                       for (AllocationID 
reservedSlot: reservedSlots) {
+                                                               
freeSlot(reservedSlot, throwable);
+                                                       }
+                                               }
+
+                                               return null;
+                                       }
+                               }, getMainThreadExecutor());
+                       } else {
+                               log.debug("There are no unassigned slots for 
the job {}.", jobId);
+                       }
+               }
+       }
+
+       private void establishJobManagerConnection(JobID jobId, 
JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, 
JMTMRegistrationSuccess registrationSuccess) {
+               log.info("Establish JobManager connection for job {}.", jobId);
+
+               if (jobManagerTable.contains(jobId)) {
+                       JobManagerConnection oldJobManagerConnection = 
jobManagerTable.get(jobId);
+
+                       if 
(!oldJobManagerConnection.getLeaderId().equals(jobManagerLeaderId)) {
+                               closeJobManagerConnection(jobId);
+                               jobManagerTable.put(jobId, 
associateWithJobManager(jobMasterGateway, jobManagerLeaderId, 
registrationSuccess.getBlobPort()));
+                       }
+               } else {
+                       jobManagerTable.put(jobId, 
associateWithJobManager(jobMasterGateway, jobManagerLeaderId, 
registrationSuccess.getBlobPort()));
+               }
 
-       private JobManagerConnection getJobManagerConnection(ResourceID 
jobManagerID) {
-               return jobManagerConnections.get(jobManagerID);
+               offerSlotsToJobManager(jobId);
        }
 
+       private void closeJobManagerConnection(JobID jobId) {
+               log.info("Close JobManager connection for job {}.", jobId);
+
+               // 1. fail tasks running under this JobID
+               Iterator<Task> tasks = taskSlotTable.getTasks(jobId);
+
+               while (tasks.hasNext()) {
+                       tasks.next().failExternally(new Exception("JobManager 
responsible for " + jobId +
+                               " lost the leadership."));
+               }
+
+               // 2. Move the active slots to state allocated (possible to 
time out again)
+               Iterator<AllocationID> activeSlots = 
taskSlotTable.getActiveSlots(jobId);
+
+               while (activeSlots.hasNext()) {
+                       AllocationID activeSlot = activeSlots.next();
+
+                       try {
+                               if (!taskSlotTable.markSlotInactive(activeSlot, 
taskManagerConfiguration.getTimeout())) {
+                                       freeSlot(activeSlot, new 
Exception("Slot could not be marked inactive."));
+                               }
+                       } catch (SlotNotFoundException e) {
+                               log.debug("Could not mark the slot {} 
inactive.", jobId, e);
+                       }
+               }
+
+               // 3. Disassociate from the JobManager
+               JobManagerConnection jobManagerConnection = 
jobManagerTable.remove(jobId);
+
+               if (jobManagerConnection != null) {
+                       try {
+                               
disassociateFromJobManager(jobManagerConnection);
+                       } catch (IOException e) {
+                               log.warn("Could not properly disassociate from 
JobManager {}.",
+                                       
jobManagerConnection.getJobManagerGateway().getAddress(), e);
+                       }
+               }
+       }
+
+       private JobManagerConnection associateWithJobManager(JobMasterGateway 
jobMasterGateway, UUID jobManagerLeaderId, int blobPort) {
+               Preconditions.checkNotNull(jobManagerLeaderId);
+               Preconditions.checkNotNull(jobMasterGateway);
+               Preconditions.checkArgument(blobPort > 0 || blobPort < 
MAX_BLOB_PORT, "Blob port is out of range.");
+
+               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway);
+
+               CheckpointResponder checkpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
+
+               InetSocketAddress address = new 
InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
+
+               BlobCache blobCache = new BlobCache(address, 
taskManagerConfiguration.getConfiguration());
+
+               LibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(
+                       blobCache,
+                       taskManagerConfiguration.getCleanupInterval());
+
+               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
+                       jobManagerLeaderId,
+                       jobMasterGateway,
+                       getRpcService().getExecutor(),
+                       taskManagerConfiguration.getTimeout());
+
+               PartitionStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway);
+
+               return new JobManagerConnection(
+                       jobMasterGateway,
+                       jobManagerLeaderId,
+                       taskManagerActions,
+                       checkpointResponder,
+                       libraryCacheManager,
+                       resultPartitionConsumableNotifier,
+                       partitionStateChecker);
+       }
+
+       private void disassociateFromJobManager(JobManagerConnection 
jobManagerConnection) throws IOException {
+               Preconditions.checkNotNull(jobManagerConnection);
+               JobMasterGateway jobManagerGateway = 
jobManagerConnection.getJobManagerGateway();
+               jobManagerGateway.disconnectTaskManager(getResourceID());
+               jobManagerConnection.getLibraryCacheManager().shutdown();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal task methods
+       // 
------------------------------------------------------------------------
+
        private void failTask(final ExecutionAttemptID executionAttemptID, 
final Throwable cause) {
                final Task task = taskSlotTable.getTask(executionAttemptID);
 
@@ -571,94 +872,11 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
-       private void notifyOfNewResourceManagerLeader(String newLeaderAddress, 
UUID newLeaderId) {
-               if (resourceManagerConnection != null) {
-                       if (newLeaderAddress != null) {
-                               // the resource manager switched to a new leader
-                               log.info("ResourceManager leader changed from 
{} to {}. Registering at new leader.",
-                                       
resourceManagerConnection.getTargetAddress(), newLeaderAddress);
-                       }
-                       else {
-                               // address null means that the current leader 
is lost without a new leader being there, yet
-                               log.info("Current ResourceManager {} lost 
leader status. Waiting for new ResourceManager leader.",
-                                       
resourceManagerConnection.getTargetAddress());
-                       }
-
-                       // drop the current connection or connection attempt
-                       if (resourceManagerConnection != null) {
-                               resourceManagerConnection.close();
-                               resourceManagerConnection = null;
-                       }
-               }
-
-               unconfirmedFreeSlots.clear();
-
-               // establish a connection to the new leader
-               if (newLeaderAddress != null) {
-                       log.info("Attempting to register at ResourceManager 
{}", newLeaderAddress);
-                       resourceManagerConnection =
-                               new TaskExecutorToResourceManagerConnection(
-                                       log,
-                                       this,
-                                       newLeaderAddress,
-                                       newLeaderId,
-                                       getMainThreadExecutor());
-                       resourceManagerConnection.start();
-               }
-       }
-
-       private JobManagerConnection associateWithJobManager(UUID 
jobMasterLeaderId,
-                       JobMasterGateway jobMasterGateway, int blobPort)
-       {
-               Preconditions.checkNotNull(jobMasterLeaderId);
-               Preconditions.checkNotNull(jobMasterGateway);
-               Preconditions.checkArgument(blobPort > 0 || blobPort <= 65535, 
"Blob port is out of range.");
-
-               TaskManagerActions taskManagerActions = new 
TaskManagerActionsImpl(jobMasterLeaderId, jobMasterGateway);
-
-               CheckpointResponder checkpointResponder = new 
RpcCheckpointResponder(jobMasterGateway);
-
-               InetSocketAddress address = new 
InetSocketAddress(jobMasterGateway.getAddress(), blobPort);
-
-               BlobCache blobCache = new BlobCache(address, 
taskManagerConfiguration.getConfiguration());
-
-               LibraryCacheManager libraryCacheManager = new 
BlobLibraryCacheManager(
-                       blobCache,
-                       taskManagerConfiguration.getCleanupInterval());
-
-               ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier(
-                               jobMasterLeaderId,
-                               jobMasterGateway,
-                               getRpcService().getExecutor(),
-                               taskManagerConfiguration.getTimeout());
-
-               PartitionStateChecker partitionStateChecker = new 
RpcPartitionStateChecker(jobMasterLeaderId, jobMasterGateway);
-
-               return new JobManagerConnection(
-                               jobMasterLeaderId,
-                               jobMasterGateway,
-                               taskManagerActions,
-                               checkpointResponder,
-                               libraryCacheManager,
-                               resultPartitionConsumableNotifier,
-                               partitionStateChecker);
-       }
-
-       private void disassociateFromJobManager(JobManagerConnection 
jobManagerConnection) throws IOException {
-               if (jobManagerConnection != null) {
-                       JobMasterGateway jobManagerGateway = 
jobManagerConnection.getJobManagerGateway();
-
-                       
jobManagerGateway.disconnectTaskManager(getResourceID());
-
-                       
jobManagerConnection.getLibraryCacheManager().shutdown();
-               }
-       }
-
-       private void freeSlot(AllocationID allocationId) {
+       private void freeSlot(AllocationID allocationId, Throwable cause) {
                Preconditions.checkNotNull(allocationId);
 
                try {
-                       int freedSlotIndex = 
taskSlotTable.freeSlot(allocationId);
+                       int freedSlotIndex = 
taskSlotTable.freeSlot(allocationId, cause);
 
                        if (freedSlotIndex != -1 && 
isConnectedToResourceManager()) {
                                // the slot was freed. Tell the RM about it
@@ -674,21 +892,35 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private void freeSlot(AllocationID allocationId) {
+               freeSlot(allocationId, new Exception("The slot " + allocationId 
+ " is beeing freed."));
+       }
+
        private void timeoutSlot(AllocationID allocationId, UUID ticket) {
                Preconditions.checkNotNull(allocationId);
                Preconditions.checkNotNull(ticket);
 
                if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
-                       freeSlot(allocationId);
+                       freeSlot(allocationId, new Exception("The slot " + 
allocationId + " has timed out."));
                } else {
                        log.debug("Received an invalid timeout for allocation 
id {} with ticket {}.", allocationId, ticket);
                }
        }
 
+       // 
------------------------------------------------------------------------
+       //  Internal utility methods
+       // 
------------------------------------------------------------------------
+
        private boolean isConnectedToResourceManager() {
                return (resourceManagerConnection != null && 
resourceManagerConnection.isConnected());
        }
 
+       private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) 
{
+               JobManagerConnection jmConnection = jobManagerTable.get(jobId);
+
+               return jmConnection != null && 
jmConnection.getLeaderId().equals(leaderId);
+       }
+
        // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
@@ -737,11 +969,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                return resourceManagerConnection;
        }
 
-       @VisibleForTesting
-       public void addUnconfirmedFreeSlotNotification(SlotID slotID) {
-               unconfirmedFreeSlots.add(slotID);
-       }
-
        // 
------------------------------------------------------------------------
        //  Utility classes
        // 
------------------------------------------------------------------------
@@ -767,6 +994,44 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private final class JobLeaderListenerImpl implements JobLeaderListener {
+
+               @Override
+               public void jobManagerGainedLeadership(
+                       final JobID jobId,
+                       final JobMasterGateway jobManagerGateway,
+                       final UUID jobLeaderId,
+                       final JMTMRegistrationSuccess registrationMessage) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       establishJobManagerConnection(
+                                               jobId,
+                                               jobManagerGateway,
+                                               jobLeaderId,
+                                               registrationMessage);
+                               }
+                       });
+               }
+
+               @Override
+               public void jobManagerLostLeadership(final JobID jobId, final 
UUID jobLeaderId) {
+                       log.info("JobManager for job {} with leader id {} lost 
leadership.", jobId, jobLeaderId);
+
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       closeJobManagerConnection(jobId);
+                               }
+                       });
+               }
+
+               @Override
+               public void handleError(Throwable throwable) {
+                       onFatalErrorAsync(throwable);
+               }
+       }
+
        private final class TaskManagerActionsImpl implements 
TaskManagerActions {
                private final UUID jobMasterLeaderId;
                private final JobMasterGateway jobMasterGateway;
@@ -830,5 +1095,4 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        });
                }
        }
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index f062b96..1ffc407 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -18,10 +18,10 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -30,9 +30,9 @@ import org.apache.flink.runtime.executiongraph.PartitionInfo;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
 import org.apache.flink.runtime.taskmanager.Task;
 
-import java.util.Collection;
 import java.util.UUID;
 
 /**
@@ -43,28 +43,31 @@ public interface TaskExecutorGateway extends RpcGateway {
        /**
         * Requests a slot from the TaskManager
         *
-        * @param slotID slot id for the request
-        * @param allocationID id for the request
-        * @param resourceManagerLeaderID current leader id of the 
ResourceManager
+        * @param slotId slot id for the request
+        * @param allocationId id for the request
+        * @param resourceManagerLeaderId current leader id of the 
ResourceManager
+        * @throws SlotAllocationException if the slot allocation fails
         * @return answer to the slot request
         */
        Future<TMSlotRequestReply> requestSlot(
-               SlotID slotID,
-               AllocationID allocationID,
-               UUID resourceManagerLeaderID,
+               SlotID slotId,
+               JobID jobId,
+               AllocationID allocationId,
+               String targetAddress,
+               UUID resourceManagerLeaderId,
                @RpcTimeout Time timeout);
 
        /**
         * Submit a {@link Task} to the {@link TaskExecutor}.
         *
         * @param tdd describing the task to submit
-        * @param jobManagerID identifying the submitting JobManager
+        * @param leaderId of the job leader
         * @param timeout of the submit operation
         * @return Future acknowledge of the successful operation
         */
        Future<Acknowledge> submitTask(
                TaskDeploymentDescriptor tdd,
-               ResourceID jobManagerID,
+               UUID leaderId,
                @RpcTimeout Time timeout);
 
        /**
@@ -74,7 +77,7 @@ public interface TaskExecutorGateway extends RpcGateway {
         * @param partitionInfos telling where the partition can be retrieved 
from
         * @return Future acknowledge if the partitions have been successfully 
updated
         */
-       Future<Acknowledge> updatePartitions(ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos);
+       Future<Acknowledge> updatePartitions(ExecutionAttemptID 
executionAttemptID, Iterable<PartitionInfo> partitionInfos);
 
        /**
         * Fail all intermediate result partitions of the given task.

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
index 2dbd9eb..53f030e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java
@@ -68,11 +68,16 @@ public class TaskExecutorToResourceManagerConnection
 
        @Override
        protected void onRegistrationSuccess(TaskExecutorRegistrationSuccess 
success) {
+               log.info("Successful registration at resource manager {} under 
registration id {}.",
+                       getTargetAddress(), success.getRegistrationId());
+
                registrationId = success.getRegistrationId();
        }
 
        @Override
        protected void onRegistrationFailure(Throwable failure) {
+               log.info("Failed to register at resource manager {}.", 
getTargetAddress(), failure);
+
                taskExecutor.onFatalErrorAsync(failure);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index ca1d2ce..9f78682 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -99,6 +99,8 @@ public class TaskManagerRunner implements FatalErrorHandler {
                        taskManagerServices.getBroadcastVariableManager(),
                        taskManagerServices.getFileCache(),
                        taskManagerServices.getTaskSlotTable(),
+                       taskManagerServices.getJobManagerTable(),
+                       taskManagerServices.getJobLeaderService(),
                        this);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index c1728b4..7575ba3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -73,6 +73,8 @@ public class TaskManagerServices {
        private final BroadcastVariableManager broadcastVariableManager;
        private final FileCache fileCache;
        private final TaskSlotTable taskSlotTable;
+       private final JobManagerTable jobManagerTable;
+       private final JobLeaderService jobLeaderService;
 
        private TaskManagerServices(
                TaskManagerLocation taskManagerLocation,
@@ -83,7 +85,9 @@ public class TaskManagerServices {
                TaskManagerMetricGroup taskManagerMetricGroup,
                BroadcastVariableManager broadcastVariableManager,
                FileCache fileCache,
-               TaskSlotTable taskSlotTable) {
+               TaskSlotTable taskSlotTable,
+               JobManagerTable jobManagerTable,
+               JobLeaderService jobLeaderService) {
 
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -94,6 +98,8 @@ public class TaskManagerServices {
                this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
                this.fileCache = Preconditions.checkNotNull(fileCache);
                this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
+               this.jobManagerTable = 
Preconditions.checkNotNull(jobManagerTable);
+               this.jobLeaderService = 
Preconditions.checkNotNull(jobLeaderService);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -136,6 +142,14 @@ public class TaskManagerServices {
                return taskSlotTable;
        }
 
+       public JobManagerTable getJobManagerTable() {
+               return jobManagerTable;
+       }
+
+       public JobLeaderService getJobLeaderService() {
+               return jobLeaderService;
+       }
+
        // 
--------------------------------------------------------------------------------------------
        //  Static factory methods for task manager services
        // 
--------------------------------------------------------------------------------------------
@@ -190,6 +204,10 @@ public class TaskManagerServices {
                final TimerService<AllocationID> timerService = new 
TimerService<>(new ScheduledThreadPoolExecutor(1));
 
                final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, timerService);
+
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(resourceID);
                
                return new TaskManagerServices(
                        taskManagerLocation,
@@ -200,7 +218,9 @@ public class TaskManagerServices {
                        taskManagerMetricGroup,
                        broadcastVariableManager,
                        fileCache,
-                       taskSlotTable);
+                       taskSlotTable,
+                       jobManagerTable,
+                       jobLeaderService);
        }
 
        /**

Reply via email to