Repository: flink
Updated Branches:
  refs/heads/flip-6 9da76dcfd -> 928800569


http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
new file mode 100644
index 0000000..66d8102
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/exceptions/SlotAllocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.exceptions;
+
+/**
+ * Exception indicating that the slot allocation on the task manager failed.
+ */
+public class SlotAllocationException extends TaskManagerException {
+
+       private static final long serialVersionUID = -4764932098204266773L;
+
+       public SlotAllocationException(String message) {
+               super(message);
+       }
+
+       public SlotAllocationException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public SlotAllocationException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
index 42cb919..88123b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -70,10 +70,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
 
        /** Interface for slot actions, such as freeing them or timing them out 
*/
        private SlotActions slotActions;
-
-       /** The timeout for allocated slots */
-       private Time slotTimeout;
-
+       
        /** Whether the table has been started */
        private boolean started;
 
@@ -104,7 +101,6 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                slotsPerJob = new HashMap<>(4);
 
                slotActions = null;
-               slotTimeout = null;
                started = false;
        }
 
@@ -112,11 +108,9 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * Start the task slot table with the given slot actions and slot 
timeout value.
         *
         * @param initialSlotActions to use for slot actions
-        * @param initialSlotTimeout to use for slot timeouts
         */
-       public void start(SlotActions initialSlotActions, Time 
initialSlotTimeout) {
+       public void start(SlotActions initialSlotActions) {
                this.slotActions = 
Preconditions.checkNotNull(initialSlotActions);
-               this.slotTimeout = 
Preconditions.checkNotNull(initialSlotTimeout);
 
                timerService.start(this);
 
@@ -129,7 +123,6 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
        public void stop() {
                started = false;
                timerService.stop();
-               slotTimeout = null;
                slotActions = null;
        }
 
@@ -144,9 +137,10 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * @param index of the task slot to allocate
         * @param jobId to allocate the task slot for
         * @param allocationId identifying the allocation
+        * @param slotTimeout until the slot times out
         * @return True if the task slot could be allocated; otherwise false
         */
-       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId) {
+       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId, Time slotTimeout) {
                checkInit();
 
                TaskSlot taskSlot = taskSlots.get(index);
@@ -180,7 +174,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         *
         * @param allocationId to identify the task slot to mark as active
         * @throws SlotNotFoundException if the slot could not be found for the 
given allocation id
-        * @return True if the slot could be marked active
+        * @return True if the slot could be marked active; otherwise false
         */
        public boolean markSlotActive(AllocationID allocationId) throws 
SlotNotFoundException {
                checkInit();
@@ -190,6 +184,8 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
                if (taskSlot != null) {
                        if (taskSlot.markActive()) {
                                // unregister a potential timeout
+                               LOG.info("Activate slot {}.", allocationId);
+
                                timerService.unregisterTimeout(allocationId);
 
                                return true;
@@ -206,10 +202,11 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * then a {@link SlotNotFoundException} is thrown.
         *
         * @param allocationId to identify the task slot to mark as inactive
+        * @param slotTimeout until the slot times out
         * @throws SlotNotFoundException if the slot could not be found for the 
given allocation id
         * @return True if the slot could be marked inactive
         */
-       public boolean markSlotInactive(AllocationID allocationId) throws 
SlotNotFoundException {
+       public boolean markSlotInactive(AllocationID allocationId, Time 
slotTimeout) throws SlotNotFoundException {
                checkInit();
 
                TaskSlot taskSlot = getTaskSlot(allocationId);
@@ -253,6 +250,12 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         */
        public int freeSlot(AllocationID allocationId, Throwable cause) throws 
SlotNotFoundException {
                checkInit();
+               
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Free slot {}.", allocationId, cause);
+               } else {
+                       LOG.info("Free slot {}.", allocationId);
+               }
 
                TaskSlot taskSlot = getTaskSlot(allocationId);
 
@@ -322,8 +325,6 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * @return True if the given task slot is allocated for the given job 
and allocation id
         */
        public boolean isAllocated(int index, JobID jobId, AllocationID 
allocationId) {
-               checkInit();
-
                TaskSlot taskSlot = taskSlots.get(index);
 
                return taskSlot.isAllocated(jobId, allocationId);
@@ -336,7 +337,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * @param allocationId identifying the allocation
         * @return True if there exists a task slot which is active for the 
given job and allocation id.
         */
-       public boolean existActiveSlot(JobID jobId, AllocationID allocationId) {
+       public boolean existsActiveSlot(JobID jobId, AllocationID allocationId) 
{
                TaskSlot taskSlot = getTaskSlot(allocationId);
 
                if (taskSlot != null) {
@@ -431,6 +432,8 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
         * @return The removed task if there is any for the given execution 
attempt id; otherwise null
         */
        public Task removeTask(ExecutionAttemptID executionAttemptID) {
+               checkInit();
+
                TaskSlotMapping taskSlotMapping = 
taskSlotMappings.remove(executionAttemptID);
 
                if (taskSlotMapping != null) {
@@ -481,6 +484,8 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
 
        @Override
        public void notifyTimeout(AllocationID key, UUID ticket) {
+               checkInit();
+
                if (slotActions != null) {
                        slotActions.timeoutSlot(key, ticket);
                }
@@ -493,9 +498,7 @@ public class TaskSlotTable implements 
TimeoutListener<AllocationID> {
        private TaskSlot getTaskSlot(AllocationID allocationId) {
                Preconditions.checkNotNull(allocationId);
 
-               TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
-
-               return taskSlot;
+               return allocationIDTaskSlotMap.get(allocationId);
        }
 
        private void checkInit() {

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index b67737d..90a829c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -278,7 +278,7 @@ public class Task implements Runnable, TaskActions {
                this.jobId = checkNotNull(tdd.getJobID());
                this.vertexId = checkNotNull(tdd.getVertexID());
                this.executionId  = checkNotNull(tdd.getExecutionId());
-               this.allocationId = checkNotNull(tdd.getAllocationID());
+               this.allocationId = checkNotNull(tdd.getAllocationId());
                this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
                this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
                this.taskConfiguration = 
checkNotNull(tdd.getTaskConfiguration());

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 39ea176..993fd19 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -44,6 +45,7 @@ public class TaskDeploymentDescriptorTest {
        public void testSerialization() {
                try {
                        final JobID jobID = new JobID();
+                       final AllocationID allocationId = new AllocationID();
                        final JobVertexID vertexID = new JobVertexID();
                        final ExecutionAttemptID execId = new 
ExecutionAttemptID();
                        final String jobName = "job name";
@@ -61,7 +63,7 @@ public class TaskDeploymentDescriptorTest {
                        final List<URL> requiredClasspaths = new 
ArrayList<URL>(0);
                        final SerializedValue<ExecutionConfig> executionConfig 
= new SerializedValue<>(new ExecutionConfig());
 
-                       final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, jobName, vertexID, execId,
+                       final TaskDeploymentDescriptor orig = new 
TaskDeploymentDescriptor(jobID, allocationId, jobName, vertexID, execId,
                                executionConfig, taskName, numberOfKeyGroups, 
indexInSubtaskGroup, currentNumberOfSubtasks, attemptNumber,
                                jobConfiguration, taskConfiguration, 
invokableClass.getName(), producedResults, inputGates,
                                requiredJars, requiredClasspaths, 47);
@@ -69,12 +71,14 @@ public class TaskDeploymentDescriptorTest {
                        final TaskDeploymentDescriptor copy = 
CommonTestUtils.createCopySerializable(orig);
        
                        assertFalse(orig.getJobID() == copy.getJobID());
+                       assertFalse(orig.getAllocationId() == 
copy.getAllocationId());
                        assertFalse(orig.getVertexID() == copy.getVertexID());
                        assertFalse(orig.getTaskName() == copy.getTaskName());
                        assertFalse(orig.getJobConfiguration() == 
copy.getJobConfiguration());
                        assertFalse(orig.getTaskConfiguration() == 
copy.getTaskConfiguration());
 
                        assertEquals(orig.getJobID(), copy.getJobID());
+                       assertEquals(orig.getAllocationId(), 
copy.getAllocationId());
                        assertEquals(orig.getVertexID(), copy.getVertexID());
                        assertEquals(orig.getTaskName(), copy.getTaskName());
                        assertEquals(orig.getNumberOfKeyGroups(), 
copy.getNumberOfKeyGroups());

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index a255027..38e372d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -91,7 +91,7 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        }
 
        @Override
-       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID) 
throws Exception {
+       public LeaderRetrievalService getJobManagerLeaderRetriever(JobID jobID, 
String defaultJobManagerAddress) throws Exception {
                LeaderRetrievalService service = 
this.jobMasterLeaderRetrievers.get(jobID);
                if (service != null) {
                        return service;

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index 49f2268..073aeac 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -75,7 +76,8 @@ public class TaskManagerGroupTest extends TestLogger {
                final ExecutionAttemptID execution21 = new ExecutionAttemptID();
 
                TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
-                       jid1, 
+                       jid1,
+                       new AllocationID(),
                        jobName1, 
                        vertex11, 
                        execution11, 
@@ -91,6 +93,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
                TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
                        jid1,
+                       new AllocationID(),
                        jobName1,
                        vertex12,
                        execution12,
@@ -106,6 +109,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
                TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
                        jid2,
+                       new AllocationID(),
                        jobName2,
                        vertex21,
                        execution21,
@@ -121,6 +125,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
                TaskDeploymentDescriptor tdd4 = new TaskDeploymentDescriptor(
                        jid1,
+                       new AllocationID(),
                        jobName1,
                        vertex13,
                        execution13,
@@ -192,6 +197,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
                TaskDeploymentDescriptor tdd1 = new TaskDeploymentDescriptor(
                        jid1,
+                       new AllocationID(),
                        jobName1,
                        vertex11,
                        execution11,
@@ -207,6 +213,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
                TaskDeploymentDescriptor tdd2 = new TaskDeploymentDescriptor(
                        jid1,
+                       new AllocationID(),
                        jobName1,
                        vertex12,
                        execution12,
@@ -222,6 +229,7 @@ public class TaskManagerGroupTest extends TestLogger {
 
                TaskDeploymentDescriptor tdd3 = new TaskDeploymentDescriptor(
                        jid2,
+                       new AllocationID(),
                        jobName2,
                        vertex21,
                        execution21,

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 558d3c2..948c129 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -68,7 +68,7 @@ public class SlotManagerTest {
                taskExecutorRegistration = 
Mockito.mock(TaskExecutorRegistration.class);
                TaskExecutorGateway gateway = 
Mockito.mock(TaskExecutorGateway.class);
                
Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
-               Mockito.when(gateway.requestSlot(any(SlotID.class), 
any(AllocationID.class), any(UUID.class), any(Time.class)))
+               Mockito.when(gateway.requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), 
any(Time.class)))
                        .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index 24d959e..86cd1f8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -144,7 +144,7 @@ public class SlotProtocolTest extends TestLogger {
                Mockito
                        .when(
                                taskExecutorGateway
-                                       .requestSlot(any(SlotID.class), 
any(AllocationID.class), any(UUID.class), any(Time.class)))
+                                       .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), 
any(Time.class)))
                        .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
@@ -161,7 +161,7 @@ public class SlotProtocolTest extends TestLogger {
 
                // 4) Slot becomes available and TaskExecutor gets a SlotRequest
                verify(taskExecutorGateway, timeout(5000))
-                       .requestSlot(eq(slotID), eq(allocationID), 
any(UUID.class), any(Time.class));
+                       .requestSlot(eq(slotID), eq(jobID), eq(allocationID), 
any(String.class), any(UUID.class), any(Time.class));
        }
 
        /**
@@ -189,7 +189,7 @@ public class SlotProtocolTest extends TestLogger {
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
                Mockito.when(
                        taskExecutorGateway
-                               .requestSlot(any(SlotID.class), 
any(AllocationID.class), any(UUID.class), any(Time.class)))
+                               .requestSlot(any(SlotID.class), 
any(JobID.class), any(AllocationID.class), any(String.class), any(UUID.class), 
any(Time.class)))
                        .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
@@ -240,7 +240,7 @@ public class SlotProtocolTest extends TestLogger {
 
                // 4) a SlotRequest is routed to the TaskExecutor
                verify(taskExecutorGateway, timeout(5000))
-                       .requestSlot(eq(slotID), eq(allocationID), 
any(UUID.class), any(Time.class));
+                       .requestSlot(eq(slotID), eq(jobID), eq(allocationID), 
any(String.class), any(UUID.class), any(Time.class));
        }
 
        private static TestingLeaderElectionService configureHA(

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e7143ae..bbde331 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -287,17 +288,27 @@ public class RpcCompletenessTest extends TestLogger {
                                if 
(!futureClass.equals(RpcCompletenessTest.futureClass)) {
                                        return false;
                                } else {
-                                       Class<?> valueClass = 
ReflectionUtil.getTemplateType1(gatewayMethod.getGenericReturnType());
+                                       ReflectionUtil.FullTypeInfo 
fullValueTypeInfo = 
ReflectionUtil.getFullTemplateType(gatewayMethod.getGenericReturnType(), 0);
 
                                        if 
(endpointMethod.getReturnType().equals(futureClass)) {
-                                               Class<?> rpcEndpointValueClass 
= ReflectionUtil.getTemplateType1(endpointMethod.getGenericReturnType());
+                                               ReflectionUtil.FullTypeInfo 
fullRpcEndpointValueTypeInfo = 
ReflectionUtil.getFullTemplateType(endpointMethod.getGenericReturnType(), 0);
 
                                                // check if we have the same 
future value types
-                                               if (valueClass != null && 
rpcEndpointValueClass != null && !checkType(valueClass, rpcEndpointValueClass)) 
{
-                                                       return false;
+                                               if (fullValueTypeInfo != null 
&& fullRpcEndpointValueTypeInfo != null) {
+                                                       Iterator<Class<?>> 
valueClasses = fullValueTypeInfo.getClazzIterator();
+                                                       Iterator<Class<?>> 
rpcClasses = fullRpcEndpointValueTypeInfo.getClazzIterator();
+
+                                                       while 
(valueClasses.hasNext() && rpcClasses.hasNext()) {
+                                                               if 
(!checkType(valueClasses.next(), rpcClasses.next())) {
+                                                                       return 
false;
+                                                               }
+                                                       }
+
+                                                       // both should be empty
+                                                       return 
!valueClasses.hasNext() && !rpcClasses.hasNext();
                                                }
                                        } else {
-                                               if (valueClass != null && 
!checkType(valueClass, endpointMethod.getReturnType())) {
+                                               if (fullValueTypeInfo != null 
&& !checkType(fullValueTypeInfo.getClazz(), endpointMethod.getReturnType())) {
                                                        return false;
                                                }
                                        }
@@ -342,16 +353,16 @@ public class RpcCompletenessTest extends TestLogger {
                if (method.getReturnType().equals(Void.TYPE)) {
                        builder.append("void").append(" ");
                } else if (method.getReturnType().equals(futureClass)) {
-                       Class<?> valueClass = 
ReflectionUtil.getTemplateType1(method.getGenericReturnType());
+                       ReflectionUtil.FullTypeInfo fullTypeInfo = 
ReflectionUtil.getFullTemplateType(method.getGenericReturnType(), 0);
 
                        builder
                                .append(futureClass.getSimpleName())
                                .append("<")
-                               .append(valueClass != null ? 
valueClass.getSimpleName() : "")
+                               .append(fullTypeInfo != null ? 
fullTypeInfo.toString() : "")
                                .append(">");
 
-                       if (valueClass != null) {
-                               
builder.append("/").append(valueClass.getSimpleName());
+                       if (fullTypeInfo != null) {
+                               builder.append("/").append(fullTypeInfo);
                        }
 
                        builder.append(" ");

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index baae251..23c6833 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -18,21 +18,45 @@
 
 package org.apache.flink.runtime.taskexecutor;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.filecache.FileCache;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.query.TaskKvStateRegistry;
+import org.apache.flink.runtime.registration.RegistrationResponse;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
@@ -40,19 +64,42 @@ import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import 
org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.taskmanager.CheckpointResponder;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 
+import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 
+import org.junit.rules.TestName;
+import org.mockito.Matchers;
 import org.powermock.api.mockito.PowerMockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.InetAddress;
+import java.net.URL;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.UUID;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.*;
 import static org.mockito.Mockito.*;
+import static org.hamcrest.Matchers.contains;
 
 public class TaskExecutorTest extends TestLogger {
 
+       @Rule
+       public TestName name = new TestName();
+
+
        @Test
        public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
                final ResourceID resourceID = ResourceID.generate();
@@ -85,6 +132,8 @@ public class TaskExecutorTest extends TestLogger {
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
                                mock(TaskSlotTable.class),
+                               mock(JobManagerTable.class),
+                               mock(JobLeaderService.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -142,6 +191,8 @@ public class TaskExecutorTest extends TestLogger {
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
                                mock(TaskSlotTable.class),
+                               mock(JobManagerTable.class),
+                               mock(JobLeaderService.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -173,17 +224,385 @@ public class TaskExecutorTest extends TestLogger {
        }
 
        /**
+        * Tests that we can submit a task to the TaskManager given that we've 
allocated a slot there.
+        */
+       @Test(timeout = 1000L)
+       public void testTaskSubmission() throws Exception {
+               final Configuration configuration = new Configuration();
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+               final JobID jobId = new JobID();
+               final AllocationID allocationId = new AllocationID();
+               final UUID jobManagerLeaderId = UUID.randomUUID();
+               final JobVertexID jobVertexId = new JobVertexID();
+
+               final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
+                       jobId,
+                       allocationId,
+                       name.getMethodName(),
+                       jobVertexId,
+                       new ExecutionAttemptID(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test task",
+                       1,
+                       0,
+                       1,
+                       0,
+                       configuration,
+                       configuration,
+                       TestInvokable.class.getName(),
+                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                       Collections.<InputGateDeploymentDescriptor>emptyList(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       0);
+
+               final LibraryCacheManager libraryCacheManager = 
mock(LibraryCacheManager.class);
+               
when(libraryCacheManager.getClassLoader(eq(jobId))).thenReturn(getClass().getClassLoader());
+
+               final JobManagerConnection jobManagerConnection = new 
JobManagerConnection(
+                       mock(JobMasterGateway.class),
+                       jobManagerLeaderId,
+                       mock(TaskManagerActions.class),
+                       mock(CheckpointResponder.class),
+                       libraryCacheManager,
+                       mock(ResultPartitionConsumableNotifier.class),
+                       mock(PartitionStateChecker.class));
+
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               jobManagerTable.put(jobId, jobManagerConnection);
+
+               final TaskSlotTable taskSlotTable = mock(TaskSlotTable.class);
+               when(taskSlotTable.existsActiveSlot(eq(jobId), 
eq(allocationId))).thenReturn(true);
+               when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
+
+               final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
+
+               when(networkEnvironment.createKvStateTaskRegistry(eq(jobId), 
eq(jobVertexId))).thenReturn(mock(TaskKvStateRegistry.class));
+
+               final TaskManagerMetricGroup taskManagerMetricGroup = 
mock(TaskManagerMetricGroup.class);
+
+               
when(taskManagerMetricGroup.addTaskForJob(eq(tdd))).thenReturn(mock(TaskMetricGroup.class));
+
+               final HighAvailabilityServices haServices = 
mock(HighAvailabilityServices.class);
+               
when(haServices.getResourceManagerLeaderRetriever()).thenReturn(mock(LeaderRetrievalService.class));
+
+               try {
+
+                       TaskExecutor taskManager = new TaskExecutor(
+                               taskManagerConfiguration,
+                               mock(TaskManagerLocation.class),
+                               rpc,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               networkEnvironment,
+                               haServices,
+                               mock(MetricRegistry.class),
+                               taskManagerMetricGroup,
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
+                               taskSlotTable,
+                               jobManagerTable,
+                               mock(JobLeaderService.class),
+                               mock(FatalErrorHandler.class));
+
+                       taskManager.start();
+
+                       taskManager.submitTask(tdd, jobManagerLeaderId);
+
+                       Future<Boolean> completionFuture = 
TestInvokable.completableFuture;
+
+                       completionFuture.get();
+
+               } finally {
+                       rpc.stopService();
+               }
+       }
+
+       /**
+        * Test invokable which completes the given future when executed.
+        */
+       public static class TestInvokable extends AbstractInvokable {
+
+               static final CompletableFuture<Boolean> completableFuture = new 
FlinkCompletableFuture<>();
+
+               @Override
+               public void invoke() throws Exception {
+                       completableFuture.complete(true);
+               }
+       }
+
+       /**
+        * Tests that a TaskManager detects a job leader for which has reserved 
slots. Upon detecting
+        * the job leader, it will offer all reserved slots to the JobManager.
+        */
+       @Test
+       public void testJobLeaderDetection() throws 
TestingFatalErrorHandler.TestingException, SlotAllocationException {
+               final JobID jobId = new JobID();
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               final Configuration configuration = new Configuration();
+               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+               final ResourceID resourceId = new ResourceID("foobar");
+               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+               final TimerService<AllocationID> timerService = 
mock(TimerService.class);
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class)), timerService);
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(resourceId);
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               final TestingLeaderRetrievalService 
resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+               final TestingLeaderRetrievalService 
jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService();
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
+
+               final String resourceManagerAddress = "rm";
+               final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+               final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+               final InstanceID registrationId = new InstanceID();
+
+               when(resourceManagerGateway.registerTaskExecutor(
+                       eq(resourceManagerLeaderId),
+                       any(String.class),
+                       eq(resourceId),
+                       any(SlotReport.class),
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
 TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+               final String jobManagerAddress = "jm";
+               final UUID jobManagerLeaderId = UUID.randomUUID();
+               final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
+               final int blobPort = 42;
+
+               final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+
+               when(jobMasterGateway.registerTaskManager(
+                       any(String.class),
+                       eq(resourceId),
+                       eq(jobManagerLeaderId),
+                       any(Time.class)
+               
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
+               
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+               rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+               rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+               final AllocationID allocationId = new AllocationID();
+               final SlotID slotId = new SlotID(resourceId, 0);
+
+               try {
+                       TaskExecutor taskManager = new TaskExecutor(
+                               taskManagerConfiguration,
+                               taskManagerLocation,
+                               rpc,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               mock(NetworkEnvironment.class),
+                               haServices,
+                               mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
+                               taskSlotTable,
+                               jobManagerTable,
+                               jobLeaderService,
+                               testingFatalErrorHandler);
+
+                       taskManager.start();
+
+                       // tell the task manager about the rm leader
+                       
resourceManagerLeaderRetrievalService.notifyListener(resourceManagerAddress, 
resourceManagerLeaderId);
+
+                       // request slots from the task manager under the given 
allocation id
+                       TMSlotRequestReply reply = 
taskManager.requestSlot(slotId, jobId, allocationId, jobManagerAddress, 
resourceManagerLeaderId);
+
+                       // this is hopefully successful :-)
+                       assertTrue(reply instanceof TMSlotRequestRegistered);
+
+                       // now inform the task manager about the new job leader
+                       
jobManagerLeaderRetrievalService.notifyListener(jobManagerAddress, 
jobManagerLeaderId);
+
+                       // the job leader should get the allocation id offered
+                       
verify(jobMasterGateway).offerSlots((Iterable<AllocationID>)Matchers.argThat(contains(allocationId)),
 eq(jobManagerLeaderId), any(Time.class));
+               } finally {
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowException();
+
+                       rpc.stopService();
+               }
+       }
+
+       /**
+        * Tests that accepted slots go into state assigned and the others are 
returned to the  resource
+        * manager.
+        */
+       @Test
+       public void testSlotAcceptance() throws Exception {
+               final JobID jobId = new JobID();
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               final Configuration configuration = new Configuration();
+               final TaskManagerConfiguration taskManagerConfiguration = 
TaskManagerConfiguration.fromConfiguration(configuration);
+               final ResourceID resourceId = new ResourceID("foobar");
+               final TaskManagerLocation taskManagerLocation = new 
TaskManagerLocation(resourceId, InetAddress.getLoopbackAddress(), 1234);
+               final TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+               final TimerService<AllocationID> timerService = 
mock(TimerService.class);
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(Arrays.asList(mock(ResourceProfile.class), 
mock(ResourceProfile.class)), timerService);
+               final JobManagerTable jobManagerTable = new JobManagerTable();
+               final JobLeaderService jobLeaderService = new 
JobLeaderService(resourceId);
+               final TestingFatalErrorHandler testingFatalErrorHandler = new 
TestingFatalErrorHandler();
+
+               final String resourceManagerAddress = "rm";
+               final UUID resourceManagerLeaderId = UUID.randomUUID();
+
+               final String jobManagerAddress = "jm";
+               final UUID jobManagerLeaderId = UUID.randomUUID();
+
+               final LeaderRetrievalService 
resourceManagerLeaderRetrievalService = new 
TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId);
+               final LeaderRetrievalService jobManagerLeaderRetrievalService = 
new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId);
+               
haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService);
+               haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetrievalService);
+
+               final ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+               final InstanceID registrationId = new InstanceID();
+
+               when(resourceManagerGateway.registerTaskExecutor(
+                       eq(resourceManagerLeaderId),
+                       any(String.class),
+                       eq(resourceId),
+                       any(SlotReport.class),
+                       
any(Time.class))).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new
 TaskExecutorRegistrationSuccess(registrationId, 1000L)));
+
+               final ResourceID jmResourceId = new 
ResourceID(jobManagerAddress);
+               final int blobPort = 42;
+
+               final AllocationID allocationId1 = new AllocationID();
+               final AllocationID allocationId2 = new AllocationID();
+
+               final JobMasterGateway jobMasterGateway = 
mock(JobMasterGateway.class);
+
+               when(jobMasterGateway.registerTaskManager(
+                       any(String.class),
+                       eq(resourceId),
+                       eq(jobManagerLeaderId),
+                       any(Time.class)
+               
)).thenReturn(FlinkCompletableFuture.<RegistrationResponse>completed(new 
JMTMRegistrationSuccess(jmResourceId, blobPort)));
+               
when(jobMasterGateway.getAddress()).thenReturn(jobManagerAddress);
+
+               when(jobMasterGateway.offerSlots(any(Iterable.class), 
eq(jobManagerLeaderId), any(Time.class)))
+                       
.thenReturn(FlinkCompletableFuture.completed((Iterable<AllocationID>)Collections.singleton(allocationId1)));
+
+               rpc.registerGateway(resourceManagerAddress, 
resourceManagerGateway);
+               rpc.registerGateway(jobManagerAddress, jobMasterGateway);
+
+               try {
+                       TaskExecutor taskManager = new TaskExecutor(
+                               taskManagerConfiguration,
+                               taskManagerLocation,
+                               rpc,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               mock(NetworkEnvironment.class),
+                               haServices,
+                               mock(MetricRegistry.class),
+                               mock(TaskManagerMetricGroup.class),
+                               mock(BroadcastVariableManager.class),
+                               mock(FileCache.class),
+                               taskSlotTable,
+                               jobManagerTable,
+                               jobLeaderService,
+                               testingFatalErrorHandler);
+
+                       taskManager.start();
+
+                       taskSlotTable.allocateSlot(0, jobId, allocationId1, 
Time.milliseconds(10000L));
+                       taskSlotTable.allocateSlot(1, jobId, allocationId2, 
Time.milliseconds(10000L));
+
+                       // we have to add the job after the TaskExecutor, 
because otherwise the service has not
+                       // been properly started.
+                       jobLeaderService.addJob(jobId, jobManagerAddress);
+
+                       
verify(resourceManagerGateway).notifySlotAvailable(eq(resourceManagerLeaderId), 
eq(registrationId), eq(new SlotID(resourceId, 1)));
+
+                       assertTrue(taskSlotTable.existsActiveSlot(jobId, 
allocationId1));
+                       assertFalse(taskSlotTable.existsActiveSlot(jobId, 
allocationId2));
+                       assertTrue(taskSlotTable.isSlotFree(1));
+               } finally {
+                       // check if a concurrent error occurred
+                       testingFatalErrorHandler.rethrowException();
+
+                       rpc.stopService();
+               }
+       }
+
+       private static class TestingFatalErrorHandler implements 
FatalErrorHandler {
+               private static final Logger LOG = 
LoggerFactory.getLogger(TestingFatalErrorHandler.class);
+               private final AtomicReference<Throwable> atomicThrowable;
+
+               public TestingFatalErrorHandler() {
+                       atomicThrowable = new AtomicReference<>(null);
+               }
+
+               public void rethrowException() throws TestingException {
+                       Throwable throwable = atomicThrowable.get();
+
+                       if (throwable != null) {
+                               throw new TestingException(throwable);
+                       }
+               }
+
+               public boolean hasExceptionOccurred() {
+                       return atomicThrowable.get() != null;
+               }
+
+               public Throwable getException() {
+                       return atomicThrowable.get();
+               }
+
+               @Override
+               public void onFatalError(Throwable exception) {
+                       LOG.error("OnFatalError:", exception);
+                       atomicThrowable.compareAndSet(null, exception);
+               }
+
+               
//------------------------------------------------------------------
+               // static utility classes
+               
//------------------------------------------------------------------
+
+               private static final class TestingException extends Exception {
+                       public TestingException(String message) {
+                               super(message);
+                       }
+
+                       public TestingException(String message, Throwable 
cause) {
+                               super(message, cause);
+                       }
+
+                       public TestingException(Throwable cause) {
+                               super(cause);
+                       }
+
+                       private static final long serialVersionUID = 
-4648195335470914498L;
+               }
+       }
+
+       /**
         * Tests that all allocation requests for slots are ignored if the slot 
has been reported as
         * free by the TaskExecutor but this report hasn't been confirmed by 
the ResourceManager.
         *
         * This is essential for the correctness of the state of the 
ResourceManager.
         */
+       @Ignore
        @Test
-       public void testRejectAllocationRequestsForOutOfSyncSlots() {
+       public void testRejectAllocationRequestsForOutOfSyncSlots() throws 
SlotAllocationException {
                final ResourceID resourceID = ResourceID.generate();
 
                final String address1 = "/resource/manager/address/one";
                final UUID leaderId = UUID.randomUUID();
+               final JobID jobId = new JobID();
+               final String jobManagerAddress = "foobar";
 
                final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
                try {
@@ -215,6 +634,8 @@ public class TaskExecutorTest extends TestLogger {
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
                                mock(TaskSlotTable.class),
+                               mock(JobManagerTable.class),
+                               mock(JobLeaderService.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -232,14 +653,14 @@ public class TaskExecutorTest extends TestLogger {
 
                        // test that allocating a slot works
                        final SlotID slotID = new SlotID(resourceID, 0);
-                       TMSlotRequestReply tmSlotRequestReply = 
taskManager.requestSlot(slotID, new AllocationID(), leaderId);
+                       TMSlotRequestReply tmSlotRequestReply = 
taskManager.requestSlot(slotID, jobId, new AllocationID(), jobManagerAddress, 
leaderId);
                        assertTrue(tmSlotRequestReply instanceof 
TMSlotRequestRegistered);
 
+                       // TODO: Figure out the concrete allocation behaviour 
between RM and TM. Maybe we don't need the SlotID...
                        // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
                        final SlotID unconfirmedFreeSlotID = new 
SlotID(resourceID, 1);
-                       
taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID);
                        TMSlotRequestReply tmSlotRequestReply2 =
-                               taskManager.requestSlot(unconfirmedFreeSlotID, 
new AllocationID(), leaderId);
+                               taskManager.requestSlot(unconfirmedFreeSlotID, 
jobId, new AllocationID(), jobManagerAddress, leaderId);
                        assertTrue(tmSlotRequestReply2 instanceof 
TMSlotRequestRejected);
 
                        // re-register
@@ -250,7 +671,7 @@ public class TaskExecutorTest extends TestLogger {
                        // now we should be successful because the slots status 
has been synced
                        // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
                        TMSlotRequestReply tmSlotRequestReply3 =
-                               taskManager.requestSlot(unconfirmedFreeSlotID, 
new AllocationID(), leaderId);
+                               taskManager.requestSlot(unconfirmedFreeSlotID, 
jobId, new AllocationID(), jobManagerAddress, leaderId);
                        assertTrue(tmSlotRequestReply3 instanceof 
TMSlotRequestRegistered);
 
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index eb7f3c5..e027d78 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
@@ -159,7 +160,7 @@ public class TaskAsyncCallTest {
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-                               new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
+                               new JobID(), new AllocationID(), "Job Name", 
new JobVertexID(), new ExecutionAttemptID(),
                                new SerializedValue<>(new ExecutionConfig()),
                                "Test Task", 1, 0, 1, 0,
                                new Configuration(), new Configuration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index d4efd24..d1909fe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.akka.FlinkUntypedActor;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
@@ -163,12 +164,13 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid = new 
ExecutionAttemptID();
                                final SerializedValue<ExecutionConfig> 
executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-                               final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, "TestJob", vid, eid, executionConfig,
-                                               "TestTask", 7, 2, 7, 0, new 
Configuration(), new Configuration(),
-                                               
TestInvokableCorrect.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
+                               final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(jid, new AllocationID(),
+                                       "TestJob", vid, eid, executionConfig,
+                                       "TestTask", 7, 2, 7, 0, new 
Configuration(), new Configuration(),
+                                       TestInvokableCorrect.class.getName(),
+                                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                       
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                                       new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
 
                                new Within(d) {
@@ -265,7 +267,7 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
-                                               jid1, "TestJob1", vid1, eid1,
+                                               jid1, new AllocationID(), 
"TestJob1", vid1, eid1,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "TestTask1", 5, 1, 5, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
@@ -274,7 +276,7 @@ public class TaskManagerTest extends TestLogger {
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
-                                               jid2, "TestJob2", vid2, eid2,
+                                               jid2, new AllocationID(), 
"TestJob2", vid2, eid2,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "TestTask2", 7, 2, 7, 0,
                                                new Configuration(), new 
Configuration(), TestInvokableBlockingCancelable.class.getName(),
@@ -404,13 +406,13 @@ public class TaskManagerTest extends TestLogger {
 
                                final SerializedValue<ExecutionConfig> 
executionConfig = new SerializedValue<>(new ExecutionConfig());
 
-                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, "TestJob", vid1, eid1, executionConfig,
+                               final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(jid1, new AllocationID(), "TestJob", vid1, eid1, 
executionConfig,
                                                "TestTask1", 5, 1, 5, 0, new 
Configuration(), new Configuration(), StoppableInvokable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
-                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, "TestJob", vid2, eid2, executionConfig,
+                               final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(jid2, new AllocationID(), "TestJob", vid2, eid2, 
executionConfig,
                                                "TestTask2", 7, 2, 7, 0, new 
Configuration(), new Configuration(), 
TestInvokableBlockingCancelable.class.getName(),
                                                
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
                                                
Collections.<InputGateDeploymentDescriptor>emptyList(),
@@ -531,7 +533,7 @@ public class TaskManagerTest extends TestLogger {
                                final ExecutionAttemptID eid2 = new 
ExecutionAttemptID();
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid1, eid1,
+                                               jid, new AllocationID(), 
"TestJob", vid1, eid1,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Sender", 1, 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
@@ -540,7 +542,7 @@ public class TaskManagerTest extends TestLogger {
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid2, eid2,
+                                               jid, new AllocationID(), 
"TestJob", vid2, eid2,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Receiver", 7, 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
@@ -636,7 +638,7 @@ public class TaskManagerTest extends TestLogger {
                                                );
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid1, eid1,
+                                               jid, new AllocationID(), 
"TestJob", vid1, eid1,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Sender", 1, 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
@@ -644,7 +646,7 @@ public class TaskManagerTest extends TestLogger {
                                                Collections.<URL>emptyList(), 
0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid2, eid2,
+                                               jid, new AllocationID(), 
"TestJob", vid2, eid2,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Receiver", 7, 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Receiver.class.getName(),
@@ -781,7 +783,7 @@ public class TaskManagerTest extends TestLogger {
                                                );
 
                                final TaskDeploymentDescriptor tdd1 = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid1, eid1,
+                                               jid, new AllocationID(), 
"TestJob", vid1, eid1,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Sender", 1, 0, 1, 0,
                                                new Configuration(), new 
Configuration(), Tasks.Sender.class.getName(),
@@ -789,7 +791,7 @@ public class TaskManagerTest extends TestLogger {
                                                new ArrayList<BlobKey>(), 
Collections.<URL>emptyList(), 0);
 
                                final TaskDeploymentDescriptor tdd2 = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid2, eid2,
+                                               jid, new AllocationID(), 
"TestJob", vid2, eid2,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Receiver", 7, 2, 7, 0,
                                                new Configuration(), new 
Configuration(), Tasks.BlockingReceiver.class.getName(),
@@ -929,7 +931,7 @@ public class TaskManagerTest extends TestLogger {
                                                new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid, eid,
+                                               jid, new AllocationID(), 
"TestJob", vid, eid,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Receiver", 1, 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
@@ -1022,7 +1024,7 @@ public class TaskManagerTest extends TestLogger {
                                                new 
InputGateDeploymentDescriptor(resultId, 0, icdd);
 
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               jid, "TestJob", vid, eid,
+                                               jid, new AllocationID(), 
"TestJob", vid, eid,
                                                new SerializedValue<>(new 
ExecutionConfig()),
                                                "Receiver", 1, 0, 1, 0,
                                                new Configuration(), new 
Configuration(),
@@ -1097,24 +1099,25 @@ public class TaskManagerTest extends TestLogger {
 
                                // Single blocking task
                                final TaskDeploymentDescriptor tdd = new 
TaskDeploymentDescriptor(
-                                               new JobID(),
-                                               "Job",
-                                               new JobVertexID(),
-                                               new ExecutionAttemptID(),
-                                               new SerializedValue<>(new 
ExecutionConfig()),
-                                               "Task",
-                                               1,
-                                               0,
-                                               1,
-                                               0,
-                                               new Configuration(),
-                                               new Configuration(),
-                                               
Tasks.BlockingNoOpInvokable.class.getName(),
-                                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                                               
Collections.<BlobKey>emptyList(),
-                                               Collections.<URL>emptyList(),
-                                               0);
+                                       new JobID(),
+                                       new AllocationID(),
+                                       "Job",
+                                       new JobVertexID(),
+                                       new ExecutionAttemptID(),
+                                       new SerializedValue<>(new 
ExecutionConfig()),
+                                       "Task",
+                                       1,
+                                       0,
+                                       1,
+                                       0,
+                                       new Configuration(),
+                                       new Configuration(),
+                                       
Tasks.BlockingNoOpInvokable.class.getName(),
+                                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                                       
Collections.<InputGateDeploymentDescriptor>emptyList(),
+                                       Collections.<BlobKey>emptyList(),
+                                       Collections.<URL>emptyList(),
+                                       0);
 
                                // Submit the task
                                new Within(d) {

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index 5d3eb3a..7d466f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -69,7 +69,7 @@ public class TaskStopTest {
                
when(tddMock.getTaskConfiguration()).thenReturn(mock(Configuration.class));
                
when(tddMock.getSerializedExecutionConfig()).thenReturn(mock(SerializedValue.class));
                when(tddMock.getInvokableClassName()).thenReturn("className");
-               
when(tddMock.getAllocationID()).thenReturn(mock(AllocationID.class));
+               
when(tddMock.getAllocationId()).thenReturn(mock(AllocationID.class));
 
                task = new Task(
                        tddMock,

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
index 50fc181..c5a9f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -665,16 +666,18 @@ public class TaskTest {
                }
                
                return new TaskDeploymentDescriptor(
-                               new JobID(), "Test Job", new JobVertexID(), new 
ExecutionAttemptID(),
-                               execConfig,
-                               "Test Task", 1, 0, 1, 0,
-                               new Configuration(), new Configuration(),
-                               invokable.getName(),
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.<BlobKey>emptyList(),
-                               Collections.<URL>emptyList(),
-                               0);
+                       new JobID(),
+                       new AllocationID(),
+                       "Test Job", new JobVertexID(), new ExecutionAttemptID(),
+                       execConfig,
+                       "Test Task", 1, 0, 1, 0,
+                       new Configuration(), new Configuration(),
+                       invokable.getName(),
+                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                       Collections.<InputGateDeploymentDescriptor>emptyList(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       0);
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index fb1b3b3..e84f594 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -129,24 +130,25 @@ public class InterruptSensitiveRestoreTest {
                List<Collection<OperatorStateHandle>> 
partitionableOperatorState = Collections.emptyList();
 
                return new TaskDeploymentDescriptor(
-                               new JobID(),
-                               "test job name",
-                               new JobVertexID(),
-                               new ExecutionAttemptID(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               "test task name",
-                               1, 0, 1, 0,
-                               new Configuration(),
-                               taskConfig,
-                               SourceStreamTask.class.getName(),
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.<BlobKey>emptyList(),
-                               Collections.<URL>emptyList(),
-                               0,
-                               operatorState,
-                               keyGroupState,
-                               partitionableOperatorState);
+                       new JobID(),
+                       new AllocationID(),
+                       "test job name",
+                       new JobVertexID(),
+                       new ExecutionAttemptID(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "test task name",
+                       1, 0, 1, 0,
+                       new Configuration(),
+                       taskConfig,
+                       SourceStreamTask.class.getName(),
+                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                       Collections.<InputGateDeploymentDescriptor>emptyList(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       0,
+                       operatorState,
+                       keyGroupState,
+                       partitionableOperatorState);
        }
 
        private static Task createTask(TaskDeploymentDescriptor tdd) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/flink/blob/92880056/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 9548595..abbf10c 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -220,17 +221,19 @@ public class StreamTaskTest {
                                .thenReturn(mock(TaskKvStateRegistry.class));
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
-                               new JobID(), "Job Name", new JobVertexID(), new 
ExecutionAttemptID(),
-                               new SerializedValue<>(new ExecutionConfig()),
-                               "Test Task", 1, 0, 1, 0,
-                               new Configuration(),
-                               taskConfig.getConfiguration(),
-                               invokable.getName(),
-                               
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
-                               
Collections.<InputGateDeploymentDescriptor>emptyList(),
-                               Collections.<BlobKey>emptyList(),
-                               Collections.<URL>emptyList(),
-                               0);
+                       new JobID(),
+                       new AllocationID(),
+                       "Job Name", new JobVertexID(), new ExecutionAttemptID(),
+                       new SerializedValue<>(new ExecutionConfig()),
+                       "Test Task", 1, 0, 1, 0,
+                       new Configuration(),
+                       taskConfig.getConfiguration(),
+                       invokable.getName(),
+                       
Collections.<ResultPartitionDeploymentDescriptor>emptyList(),
+                       Collections.<InputGateDeploymentDescriptor>emptyList(),
+                       Collections.<BlobKey>emptyList(),
+                       Collections.<URL>emptyList(),
+                       0);
 
                return new Task(
                        tdd,

Reply via email to