Repository: flink
Updated Branches:
  refs/heads/flip-6 7aca811df -> 9da76dcfd


[FLINK-4489] [tm] Add TaskSlotTable to manage slot allocations for multiple job 
managers

Add TimerService for slot timeouts

Add task and task slot access methods

Add comments to newly introduced classes

This closes #2638.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9da76dcf
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9da76dcf
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9da76dcf

Branch: refs/heads/flip-6
Commit: 9da76dcfde689e8f4516880459b80c448233aec5
Parents: 7aca811
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Oct 5 11:58:26 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Sat Oct 15 11:31:59 2016 +0200

----------------------------------------------------------------------
 .../runtime/clusterframework/types/SlotID.java  |   4 +-
 .../resourcemanager/ResourceManager.java        |  13 +-
 .../resourcemanager/ResourceManagerGateway.java |  13 +-
 .../runtime/taskexecutor/TaskExecutor.java      | 194 +++---
 .../runtime/taskexecutor/TaskManagerRunner.java |   1 +
 .../taskexecutor/TaskManagerServices.java       |  29 +-
 .../TaskManagerServicesConfiguration.java       |   2 +
 .../flink/runtime/taskexecutor/TaskSlot.java    |  73 --
 .../runtime/taskexecutor/TaskSlotMapping.java   |  44 --
 .../runtime/taskexecutor/slot/SlotActions.java  |  45 ++
 .../slot/SlotNotActiveException.java            |  34 +
 .../slot/SlotNotFoundException.java             |  37 +
 .../runtime/taskexecutor/slot/TaskSlot.java     | 289 ++++++++
 .../taskexecutor/slot/TaskSlotState.java        |  29 +
 .../taskexecutor/slot/TaskSlotTable.java        | 682 +++++++++++++++++++
 .../taskexecutor/slot/TimeoutListener.java      |  37 +
 .../runtime/taskexecutor/slot/TimerService.java | 160 +++++
 .../apache/flink/runtime/taskmanager/Task.java  |   8 +-
 .../PartialConsumePipelinedResultTest.java      |   1 -
 .../runtime/taskexecutor/TaskExecutorTest.java  |   4 +
 20 files changed, 1457 insertions(+), 242 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
index 237597b..d6409b6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/SlotID.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.clusterframework.types;
 
 import java.io.Serializable;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -34,8 +35,9 @@ public class SlotID implements ResourceIDRetrievable, 
Serializable {
 
        /** The numeric id for single slot */
        private final int slotNumber;
-
+       
        public SlotID(ResourceID resourceId, int slotNumber) {
+               checkArgument(0 <= slotNumber, "Slot number must be positive.");
                this.resourceId = checkNotNull(resourceId, "ResourceID must not 
be null");
                this.slotNumber = slotNumber;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 8fbb34b..3122804 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
 import 
org.apache.flink.runtime.resourcemanager.registration.JobMasterRegistration;
 import 
org.apache.flink.runtime.resourcemanager.registration.WorkerRegistration;
 import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
@@ -337,34 +336,32 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
        /**
         * Notification from a TaskExecutor that a slot has become available
         * @param resourceManagerLeaderId TaskExecutor's resource manager 
leader id
-        * @param resourceID TaskExecutor's resource id
         * @param instanceID TaskExecutor's instance id
         * @param slotID The slot id of the available slot
         * @return SlotAvailableReply
         */
        @RpcMethod
-       public SlotAvailableReply notifySlotAvailable(
+       public void notifySlotAvailable(
                        final UUID resourceManagerLeaderId,
-                       final ResourceID resourceID,
                        final InstanceID instanceID,
                        final SlotID slotID) {
 
                if (resourceManagerLeaderId.equals(leaderSessionID)) {
-                       WorkerRegistration<WorkerType> registration = 
taskExecutors.get(resourceID);
+                       final ResourceID resourceId = slotID.getResourceID();
+                       WorkerRegistration<WorkerType> registration = 
taskExecutors.get(resourceId);
+
                        if (registration != null) {
                                InstanceID registrationInstanceID = 
registration.getInstanceID();
                                if (registrationInstanceID.equals(instanceID)) {
                                        runAsync(new Runnable() {
                                                @Override
                                                public void run() {
-                                                       
slotManager.notifySlotAvailable(resourceID, slotID);
+                                                       
slotManager.notifySlotAvailable(resourceId, slotID);
                                                }
                                        });
-                                       return new 
SlotAvailableReply(leaderSessionID, slotID);
                                }
                        }
                }
-               return null;
        }
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 07e9e43..968eeb8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.instance.InstanceID;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
-import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.SlotAvailableReply;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcTimeout;
 import org.apache.flink.runtime.jobmaster.JobMaster;
@@ -94,17 +93,13 @@ public interface ResourceManagerGateway extends RpcGateway {
         * Sent by the TaskExecutor to notify the ResourceManager that a slot 
has become available.
         *
         * @param resourceManagerLeaderId The ResourceManager leader id
-        * @param resourceID The ResourceID of the TaskExecutor
-        * @param instanceID The InstanceID of the TaskExecutor
+        * @param instanceId TaskExecutor's instance id
         * @param slotID The SlotID of the freed slot
-        * @return The confirmation by the ResourceManager
         */
-       Future<SlotAvailableReply> notifySlotAvailable(
+       void notifySlotAvailable(
                UUID resourceManagerLeaderId,
-               ResourceID resourceID,
-               InstanceID instanceID,
-               SlotID slotID,
-               @RpcTimeout Time timeout);
+               InstanceID instanceId,
+               SlotID slotID);
 
        /**
         * Registers an infoMessage listener

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 9f9234f..e642315 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -47,6 +47,7 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
@@ -64,6 +65,10 @@ import 
org.apache.flink.runtime.taskexecutor.rpc.RpcCheckpointResponder;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcInputSplitProvider;
 import org.apache.flink.runtime.taskexecutor.rpc.RpcPartitionStateChecker;
 import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNotifier;
+import org.apache.flink.runtime.taskexecutor.slot.SlotActions;
+import org.apache.flink.runtime.taskexecutor.slot.SlotNotActiveException;
+import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskExecutionState;
@@ -76,7 +81,6 @@ import java.net.InetSocketAddress;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
@@ -132,13 +136,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        private Map<ResourceID, JobManagerConnection> jobManagerConnections;
 
-       // --------- Slot allocation table --------
+       // --------- task slot allocation table -----------
 
-       private Map<AllocationID, TaskSlot> taskSlots;
-
-       // --------- Slot allocation table --------
-
-       private Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+       private final TaskSlotTable taskSlotTable;
 
        // 
------------------------------------------------------------------------
 
@@ -154,6 +154,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                TaskManagerMetricGroup taskManagerMetricGroup,
                BroadcastVariableManager broadcastVariableManager,
                FileCache fileCache,
+               TaskSlotTable taskSlotTable,
                FatalErrorHandler fatalErrorHandler) {
 
                super(rpcService);
@@ -167,6 +168,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                this.networkEnvironment = checkNotNull(networkEnvironment);
                this.haServices = checkNotNull(haServices);
                this.metricRegistry = checkNotNull(metricRegistry);
+               this.taskSlotTable = checkNotNull(taskSlotTable);
                this.fatalErrorHandler = checkNotNull(fatalErrorHandler);
                this.taskManagerMetricGroup = 
checkNotNull(taskManagerMetricGroup);
                this.broadcastVariableManager = 
checkNotNull(broadcastVariableManager);
@@ -175,8 +177,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                this.jobManagerConnections = new HashMap<>(4);
 
                this.unconfirmedFreeSlots = new HashSet<>();
-               this.taskSlots = new 
HashMap<>(taskManagerConfiguration.getNumberSlots());
-               this.taskSlotMappings = new 
HashMap<>(taskManagerConfiguration.getNumberSlots() * 2);
        }
 
        // 
------------------------------------------------------------------------
@@ -193,6 +193,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                } catch (Exception e) {
                        onFatalErrorAsync(e);
                }
+
+               // tell the task slot table who's responsible for the task slot 
actions
+               taskSlotTable.start(new SlotActionsImpl(), 
taskManagerConfiguration.getTimeout());
        }
 
        /**
@@ -202,6 +205,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        public void shutDown() {
                log.info("Stopping TaskManager {}.", getAddress());
 
+               taskSlotTable.stop();
+
                if (resourceManagerConnection.isConnected()) {
                        try {
                                resourceManagerConnection.close();
@@ -264,10 +269,9 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                        throw new TaskSubmissionException(message);
                }
 
-               TaskSlot taskSlot = taskSlots.get(tdd.getAllocationID());
-
-               if (taskSlot == null) {
-                       final String message = "No task slot allocated for 
allocation ID " + tdd.getAllocationID() + '.';
+               if (!taskSlotTable.existActiveSlot(tdd.getJobID(), 
tdd.getAllocationID())) {
+                       final String message = "No task slot allocated for job 
ID " + tdd.getJobID() +
+                               " and allocation ID " + tdd.getAllocationID() + 
'.';
                        log.debug(message);
                        throw new TaskSubmissionException(message);
                }
@@ -307,10 +311,15 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
                log.info("Received task {}.", 
task.getTaskInfo().getTaskNameWithSubtasks());
 
-               if(taskSlot.add(task)) {
-                       TaskSlotMapping taskSlotMapping = new 
TaskSlotMapping(task, taskSlot);
+               boolean taskAdded;
 
-                       taskSlotMappings.put(task.getExecutionId(), 
taskSlotMapping);
+               try {
+                       taskAdded = taskSlotTable.addTask(task);
+               } catch (SlotNotFoundException | SlotNotActiveException e) {
+                       throw new TaskSubmissionException("Could not submit 
task.", e);
+               }
+
+               if (taskAdded) {
                        task.startTaskThread();
 
                        return Acknowledge.get();
@@ -325,7 +334,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        @RpcMethod
        public Acknowledge cancelTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
-               final Task task = getTask(executionAttemptID);
+               final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        try {
@@ -344,7 +353,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        @RpcMethod
        public Acknowledge stopTask(ExecutionAttemptID executionAttemptID) 
throws TaskException {
-               final Task task = getTask(executionAttemptID);
+               final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        try {
@@ -367,7 +376,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        @RpcMethod
        public Acknowledge updatePartitions(final ExecutionAttemptID 
executionAttemptID, Collection<PartitionInfo> partitionInfos) throws 
PartitionException {
-               final Task task = getTask(executionAttemptID);
+               final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        for (final PartitionInfo partitionInfo: partitionInfos) 
{
@@ -430,7 +439,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        public Acknowledge triggerCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
                log.debug("Trigger checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
 
-               final Task task = getTask(executionAttemptID);
+               final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        task.triggerCheckpointBarrier(checkpointId, 
checkpointTimestamp);
@@ -448,7 +457,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        public Acknowledge confirmCheckpoint(ExecutionAttemptID 
executionAttemptID, long checkpointId, long checkpointTimestamp) throws 
CheckpointException {
                log.debug("Confirm checkpoint {}@{} for {}.", checkpointId, 
checkpointTimestamp, executionAttemptID);
 
-               final Task task = getTask(executionAttemptID);
+               final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        task.notifyCheckpointComplete(checkpointId);
@@ -494,68 +503,8 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                return jobManagerConnections.get(jobManagerID);
        }
 
-       private Task getTask(ExecutionAttemptID executionAttemptID) {
-               TaskSlotMapping taskSlotMapping = 
taskSlotMappings.get(executionAttemptID);
-
-               if (taskSlotMapping != null) {
-                       return taskSlotMapping.getTask();
-               } else {
-                       return null;
-               }
-       }
-
-       private Task removeTask(ExecutionAttemptID executionAttemptID) {
-               TaskSlotMapping taskSlotMapping = 
taskSlotMappings.remove(executionAttemptID);
-
-               if (taskSlotMapping != null) {
-                       final Task task = taskSlotMapping.getTask();
-                       final TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
-
-                       taskSlot.remove(task);
-
-                       return task;
-               } else {
-                       return null;
-               }
-       }
-
-       private Iterable<Task> getAllTasks() {
-               final Iterator<TaskSlotMapping> taskEntryIterator = 
taskSlotMappings.values().iterator();
-               final Iterator<Task> iterator = new Iterator<Task>() {
-                       @Override
-                       public boolean hasNext() {
-                               return taskEntryIterator.hasNext();
-                       }
-
-                       @Override
-                       public Task next() {
-                               return taskEntryIterator.next().getTask();
-                       }
-
-                       @Override
-                       public void remove() {
-                               taskEntryIterator.remove();
-                       }
-               };
-
-               return new Iterable<Task>() {
-                       @Override
-                       public Iterator<Task> iterator() {
-                               return iterator;
-                       }
-               };
-       }
-
-       private void clearTasks() {
-               taskSlotMappings.clear();
-
-               for (TaskSlot taskSlot: taskSlots.values()) {
-                       taskSlot.clear();
-               }
-       }
-
        private void failTask(final ExecutionAttemptID executionAttemptID, 
final Throwable cause) {
-               final Task task = getTask(executionAttemptID);
+               final Task task = taskSlotTable.getTask(executionAttemptID);
 
                if (task != null) {
                        try {
@@ -568,18 +517,6 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
-       private void cancelAndClearAllTasks(Throwable cause) {
-               log.info("Cancellaing all computations and discarding all 
cached data.");
-
-               Iterable<Task> tasks = getAllTasks();
-
-               for (Task task: tasks) {
-                       task.failExternally(cause);
-               }
-
-               clearTasks();
-       }
-
        private void updateTaskExecutionState(
                        final UUID jobMasterLeaderId,
                        final JobMasterGateway jobMasterGateway,
@@ -602,11 +539,10 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
 
        private void unregisterTaskAndNotifyFinalState(
                        final UUID jobMasterLeaderId,
-                       final JobMasterGateway jobMasterGateway,
-                       final ExecutionAttemptID executionAttemptID)
-       {
-               Task task = removeTask(executionAttemptID);
+                       final JobMasterGateway jobMasterGateway,                
+                       final ExecutionAttemptID executionAttemptID) {
 
+               Task task = taskSlotTable.removeTask(executionAttemptID);
                if (task != null) {
                        if (!task.getExecutionState().isTerminal()) {
                                try {
@@ -718,6 +654,41 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private void freeSlot(AllocationID allocationId) {
+               Preconditions.checkNotNull(allocationId);
+
+               try {
+                       int freedSlotIndex = 
taskSlotTable.freeSlot(allocationId);
+
+                       if (freedSlotIndex != -1 && 
isConnectedToResourceManager()) {
+                               // the slot was freed. Tell the RM about it
+                               ResourceManagerGateway resourceManagerGateway = 
resourceManagerConnection.getTargetGateway();
+
+                               resourceManagerGateway.notifySlotAvailable(
+                                       
resourceManagerConnection.getTargetLeaderId(),
+                                       
resourceManagerConnection.getRegistrationId(),
+                                       new SlotID(getResourceID(), 
freedSlotIndex));
+                       }
+               } catch (SlotNotFoundException e) {
+                       log.debug("Could not free slot for allocation id {}.", 
allocationId, e);
+               }
+       }
+
+       private void timeoutSlot(AllocationID allocationId, UUID ticket) {
+               Preconditions.checkNotNull(allocationId);
+               Preconditions.checkNotNull(ticket);
+
+               if (taskSlotTable.isValidTimeout(allocationId, ticket)) {
+                       freeSlot(allocationId);
+               } else {
+                       log.debug("Received an invalid timeout for allocation 
id {} with ticket {}.", allocationId, ticket);
+               }
+       }
+
+       private boolean isConnectedToResourceManager() {
+               return (resourceManagerConnection != null && 
resourceManagerConnection.isConnected());
+       }
+
        // 
------------------------------------------------------------------------
        //  Properties
        // 
------------------------------------------------------------------------
@@ -778,7 +749,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
        /**
         * The listener for leader changes of the resource manager
         */
-       private class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
+       private final class ResourceManagerLeaderListener implements 
LeaderRetrievalListener {
 
                @Override
                public void notifyLeaderAddress(final String leaderAddress, 
final UUID leaderSessionID) {
@@ -796,7 +767,7 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
-       private class TaskManagerActionsImpl implements TaskManagerActions {
+       private final class TaskManagerActionsImpl implements 
TaskManagerActions {
                private final UUID jobMasterLeaderId;
                private final JobMasterGateway jobMasterGateway;
 
@@ -837,4 +808,27 @@ public class TaskExecutor extends 
RpcEndpoint<TaskExecutorGateway> {
                }
        }
 
+       private class SlotActionsImpl implements SlotActions {
+
+               @Override
+               public void freeSlot(final AllocationID allocationId) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
TaskExecutor.this.freeSlot(allocationId);
+                               }
+                       });
+               }
+
+               @Override
+               public void timeoutSlot(final AllocationID allocationId, final 
UUID ticket) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       
TaskExecutor.this.timeoutSlot(allocationId, ticket);
+                               }
+                       });
+               }
+       }
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
index bb66655..ca1d2ce 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java
@@ -98,6 +98,7 @@ public class TaskManagerRunner implements FatalErrorHandler {
                        taskManagerServices.getTaskManagerMetricGroup(),
                        taskManagerServices.getBroadcastVariableManager(),
                        taskManagerServices.getFileCache(),
+                       taskManagerServices.getTaskSlotTable(),
                        this);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index e264a1c..c1728b4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
@@ -38,6 +40,8 @@ import 
org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.netty.DisabledKvStateRequestStats;
 import org.apache.flink.runtime.query.netty.KvStateServer;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import 
org.apache.flink.runtime.taskexecutor.utils.TaskExecutorMetricsInitializer;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
@@ -48,6 +52,9 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 /**
  * Container for {@link TaskExecutor} services such as the {@link 
MemoryManager}, {@link IOManager},
@@ -65,6 +72,7 @@ public class TaskManagerServices {
        private final TaskManagerMetricGroup taskManagerMetricGroup;
        private final BroadcastVariableManager broadcastVariableManager;
        private final FileCache fileCache;
+       private final TaskSlotTable taskSlotTable;
 
        private TaskManagerServices(
                TaskManagerLocation taskManagerLocation,
@@ -74,7 +82,8 @@ public class TaskManagerServices {
                MetricRegistry metricRegistry,
                TaskManagerMetricGroup taskManagerMetricGroup,
                BroadcastVariableManager broadcastVariableManager,
-               FileCache fileCache) {
+               FileCache fileCache,
+               TaskSlotTable taskSlotTable) {
 
                this.taskManagerLocation = 
Preconditions.checkNotNull(taskManagerLocation);
                this.memoryManager = Preconditions.checkNotNull(memoryManager);
@@ -84,6 +93,7 @@ public class TaskManagerServices {
                this.taskManagerMetricGroup = 
Preconditions.checkNotNull(taskManagerMetricGroup);
                this.broadcastVariableManager = 
Preconditions.checkNotNull(broadcastVariableManager);
                this.fileCache = Preconditions.checkNotNull(fileCache);
+               this.taskSlotTable = Preconditions.checkNotNull(taskSlotTable);
        }
 
        // 
--------------------------------------------------------------------------------------------
@@ -121,6 +131,10 @@ public class TaskManagerServices {
        public FileCache getFileCache() {
                return fileCache;
        }
+       
+       public TaskSlotTable getTaskSlotTable() {
+               return taskSlotTable;
+       }
 
        // 
--------------------------------------------------------------------------------------------
        //  Static factory methods for task manager services
@@ -167,6 +181,16 @@ public class TaskManagerServices {
 
                final FileCache fileCache = new 
FileCache(taskManagerServicesConfiguration.getTmpDirPaths());
 
+               final List<ResourceProfile> resourceProfiles = new 
ArrayList<>(taskManagerServicesConfiguration.getNumberOfSlots());
+
+               for (int i = 0; i < 
taskManagerServicesConfiguration.getNumberOfSlots(); i++) {
+                       resourceProfiles.add(new ResourceProfile(1.0, 42L));
+               }
+
+               final TimerService<AllocationID> timerService = new 
TimerService<>(new ScheduledThreadPoolExecutor(1));
+
+               final TaskSlotTable taskSlotTable = new 
TaskSlotTable(resourceProfiles, timerService);
+               
                return new TaskManagerServices(
                        taskManagerLocation,
                        memoryManager,
@@ -175,7 +199,8 @@ public class TaskManagerServices {
                        metricRegistry,
                        taskManagerMetricGroup,
                        broadcastVariableManager,
-                       fileCache);
+                       fileCache,
+                       taskSlotTable);
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
index 80dfc09..036a890 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesConfiguration.java
@@ -173,6 +173,8 @@ public class TaskManagerServicesConfiguration {
 
                final MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(configuration);
 
+
+
                return new TaskManagerServicesConfiguration(
                        remoteAddress,
                        tmpDirs,

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
deleted file mode 100644
index 4fc1d66..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlot.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.Preconditions;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Container for multiple {@link Task} belonging to the same slot.
- */
-public class TaskSlot {
-       private final AllocationID allocationID;
-       private final ResourceID resourceID;
-       private final Map<ExecutionAttemptID, Task> tasks;
-
-       public TaskSlot(AllocationID allocationID, ResourceID resourceID) {
-               this.allocationID = Preconditions.checkNotNull(allocationID);
-               this.resourceID = Preconditions.checkNotNull(resourceID);
-               tasks = new HashMap<>(4);
-       }
-
-       public AllocationID getAllocationID() {
-               return allocationID;
-       }
-
-       public ResourceID getResourceID() {
-               return resourceID;
-       }
-
-       public boolean add(Task task) {
-               // sanity check
-               
Preconditions.checkArgument(allocationID.equals(task.getAllocationID()));
-
-               Task oldTask = tasks.put(task.getExecutionId(), task);
-
-               if (oldTask != null) {
-                       tasks.put(task.getExecutionId(), oldTask);
-                       return false;
-               } else {
-                       return true;
-               }
-       }
-
-       public Task remove(Task task) {
-               return tasks.remove(task.getExecutionId());
-       }
-
-       public void clear() {
-               tasks.clear();
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
deleted file mode 100644
index e67fd52..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskSlotMapping.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskexecutor;
-
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Mapping between a {@link Task} and its {@link TaskSlot}.
- */
-public class TaskSlotMapping {
-
-       private final Task task;
-       private final TaskSlot taskSlot;
-
-       public TaskSlotMapping(Task task, TaskSlot taskSlot) {
-               this.task = Preconditions.checkNotNull(task);
-               this.taskSlot = Preconditions.checkNotNull(taskSlot);
-       }
-
-       public Task getTask() {
-               return task;
-       }
-
-       public TaskSlot getTaskSlot() {
-               return taskSlot;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
new file mode 100644
index 0000000..f7ed235
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotActions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+import java.util.UUID;
+
+/**
+ * Interface to trigger slot actions from within the {@link TaskSlotTable}.
+ */
+public interface SlotActions {
+
+       /**
+        * Free the task slot with the given allocation id.
+        *
+        * @param allocationId to identify the slot to be freed
+        */
+       void freeSlot(AllocationID allocationId);
+
+       /**
+        * Timeout the task slot for the given allocation id. The timeout is 
identified by the given
+        * ticket to filter invalid timeouts out.
+        *
+        * @param allocationId identifying the task slot to be timed out
+        * @param ticket allowing to filter invalid timeouts out
+        */
+       void timeoutSlot(AllocationID allocationId, UUID ticket);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
new file mode 100644
index 0000000..b0ddc5d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotActiveException.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Exception indicating that the given {@link TaskSlot} was not in state 
active.
+ */
+public class SlotNotActiveException extends Exception {
+
+       private static final long serialVersionUID = 4305837511564584L;
+
+       public SlotNotActiveException(JobID jobId, AllocationID allocationId) {
+               super("No active slot for job " + jobId + " with allocation id 
" + allocationId + '.');
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
new file mode 100644
index 0000000..c639b16
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/SlotNotFoundException.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+
+/**
+ * Exception indicating that a {@link TaskSlot} could not be found.
+ */
+public class SlotNotFoundException extends Exception {
+
+       private static final long serialVersionUID = -883614807750137925L;
+
+       public SlotNotFoundException(AllocationID allocationId) {
+               this("Could not find slot for " + allocationId + '.');
+       }
+
+       public SlotNotFoundException(String message) {
+               super(message);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
new file mode 100644
index 0000000..0942772
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlot.java
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+/**
+ * Container for multiple {@link Task} belonging to the same slot. A {@link 
TaskSlot} can be in one
+ * of the following states:
+ * <ul>
+ *     <li>Free - The slot is empty and not allocated to a job</li>
+ *     <li>Releasing - The slot is about to be freed after it has become 
empty.</li>
+ *     <li>Allocated - The slot has been allocated for a job.</li>
+ *     <li>Active - The slot is in active use by a job manager which is the 
leader of the allocating job.</li>
+ * </ul>
+ * <p>
+ * A task slot can only be allocated if it is in state free. An allocated task 
slot can transition
+ * to state active.
+ *<p>
+ * An active slot allows to add tasks from the respective job and with the 
correct allocation id.
+ * An active slot can be marked as inactive which sets the state back to 
allocated.
+ * <p>
+ * An allocated or active slot can only be freed if it is empty. If it is not 
empty, then it's state
+ * can be set to releasing indicating that it can be freed once it becomes 
empty.
+ */
+public class TaskSlot {
+
+       /** Index of the task slot */
+       private final int index;
+
+       /** Resource characteristics for this slot */
+       private final ResourceProfile resourceProfile;
+
+       /** Tasks running in this slot */
+       private final Map<ExecutionAttemptID, Task> tasks;
+
+       /** State of this slot */
+       private TaskSlotState state;
+
+       /** Job id to which the slot has been allocated; null if not allocated 
*/
+       private JobID jobId;
+
+       /** Allocation id of this slot; null if not allocated */
+       private AllocationID allocationId;
+
+       TaskSlot(final int index, final ResourceProfile resourceProfile) {
+               Preconditions.checkArgument(0 <= index, "The index must be 
greater than 0.");
+               this.index = index;
+               this.resourceProfile = 
Preconditions.checkNotNull(resourceProfile);
+
+               this.tasks = new HashMap<>(4);
+               this.state = TaskSlotState.FREE;
+
+               this.jobId = null;
+               this.allocationId = null;
+       }
+
+       // 
----------------------------------------------------------------------------------
+       // State accessors
+       // 
----------------------------------------------------------------------------------
+
+       public int getIndex() {
+               return index;
+       }
+
+       public ResourceProfile getResourceProfile() {
+               return resourceProfile;
+       }
+
+       public JobID getJobId() {
+               return jobId;
+       }
+
+       public AllocationID getAllocationId() {
+               return allocationId;
+       }
+
+       TaskSlotState getState() {
+               return state;
+       }
+
+       public boolean isEmpty() {
+               return tasks.isEmpty();
+       }
+
+       public boolean isFree() {
+               return TaskSlotState.FREE == state;
+       }
+
+       public boolean isActive(JobID activeJobId, AllocationID 
activeAllocationId) {
+               Preconditions.checkNotNull(activeJobId);
+               Preconditions.checkNotNull(activeAllocationId);
+
+               return TaskSlotState.ACTIVE == state &&
+                       activeJobId.equals(jobId) &&
+                       activeAllocationId.equals(allocationId);
+       }
+
+       public boolean isAllocated(JobID jobIdToCheck, AllocationID 
allocationIDToCheck) {
+               Preconditions.checkNotNull(jobIdToCheck);
+               Preconditions.checkNotNull(allocationIDToCheck);
+
+               return jobIdToCheck.equals(jobId) && 
allocationIDToCheck.equals(allocationId) &&
+                       (TaskSlotState.ACTIVE == state || 
TaskSlotState.ALLOCATED == state);
+       }
+
+       public boolean isReleasing() {
+               return TaskSlotState.RELEASING == state;
+       }
+
+       /**
+        * Get all tasks running in this task slot.
+        *
+        * @return Iterator to all currently contained tasks in this task slot.
+        */
+       public Iterator<Task> getTasks() {
+               return tasks.values().iterator();
+       }
+
+       // 
----------------------------------------------------------------------------------
+       // State changing methods
+       // 
----------------------------------------------------------------------------------
+
+       /**
+        * Add the given task to the task slot. This is only possible if there 
is not already another
+        * task with the same execution attempt id added to the task slot. In 
this case, the method
+        * returns true. Otherwise the task slot is left unchanged and false is 
returned.
+        *
+        * In case that the task slot state is not active an {@link 
IllegalStateException} is thrown.
+        * In case that the task's job id and allocation id don't match with 
the job id and allocation
+        * id for which the task slot has been allocated, an {@link 
IllegalArgumentException} is thrown.
+        *
+        * @param task to be added to the task slot
+        * @throws IllegalStateException if the task slot is not in state active
+        * @return true if the task was added to the task slot; otherwise false
+        */
+       public boolean add(Task task) {
+               // Check that this slot has been assigned to the job sending 
this task
+               Preconditions.checkArgument(task.getJobID().equals(jobId), "The 
task's job id does not match the " +
+                       "job id for which the slot has been allocated.");
+               
Preconditions.checkArgument(task.getAllocationId().equals(allocationId), "The 
task's allocation " +
+                       "id does not match the allocation id for which the slot 
has been allocated.");
+               Preconditions.checkState(TaskSlotState.ACTIVE == state, "The 
task slot is not in state active.");
+
+               Task oldTask = tasks.put(task.getExecutionId(), task);
+
+               if (oldTask != null) {
+                       tasks.put(task.getExecutionId(), oldTask);
+                       return false;
+               } else {
+                       return true;
+               }
+       }
+
+       /**
+        * Remove the task identified by the given execution attempt id.
+        *
+        * @param executionAttemptId identifying the task to be removed
+        * @return The removed task if there was any; otherwise null.
+        */
+       public Task remove(ExecutionAttemptID executionAttemptId) {
+               return tasks.remove(executionAttemptId);
+       }
+
+       /**
+        * Removes all tasks from this task slot.
+        */
+       public void clear() {
+               tasks.clear();
+       }
+
+       /**
+        * Allocate the task slot for the given job and allocation id. If the 
slot could be allocated,
+        * or is already allocated/active for the given job and allocation id, 
then the method returns
+        * true. Otherwise it returns false.
+        *
+        * A slot can only be allocated if it's current state is free.
+        *
+        * @param newJobId to allocate the slot for
+        * @param newAllocationId to identify the slot allocation
+        * @return True if the slot was allocated for the given job and 
allocation id; otherwise false
+        */
+       public boolean allocate(JobID newJobId, AllocationID newAllocationId) {
+               if (TaskSlotState.FREE == state) {
+                       // sanity checks
+                       Preconditions.checkState(allocationId == null);
+                       Preconditions.checkState(jobId == null);
+
+                       this.jobId = Preconditions.checkNotNull(newJobId);
+                       this.allocationId = 
Preconditions.checkNotNull(newAllocationId);
+
+                       state = TaskSlotState.ALLOCATED;
+
+                       return true;
+               } else if (TaskSlotState.ALLOCATED == state || 
TaskSlotState.ACTIVE == state) {
+                       Preconditions.checkNotNull(newJobId);
+                       Preconditions.checkNotNull(newAllocationId);
+
+                       return newJobId.equals(jobId) && 
newAllocationId.equals(allocationId);
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Mark this slot as active. A slot can only be marked active if it's 
in state allocated.
+        *
+        * The method returns true if the slot was set to active. Otherwise it 
returns false.
+        *
+        * @return True if the new state of the slot is active; otherwise false
+        */
+       public boolean markActive() {
+               if (TaskSlotState.ALLOCATED == state || TaskSlotState.ACTIVE == 
state) {
+                       state = TaskSlotState.ACTIVE;
+
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Mark the slot as inactive/allocated. A slot can only be marked as 
inactive/allocated if it's
+        * in state allocated or active.
+        *
+        * @return True if the new state of the slot is allocated; otherwise 
false
+        */
+       public boolean markInactive() {
+               if (TaskSlotState.ACTIVE == state || TaskSlotState.ALLOCATED == 
state) {
+                       state = TaskSlotState.ALLOCATED;
+
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Mark the slot as free. A slot can only marked as free if it's empty.
+        *
+        * @return True if the new state is free; otherwise false
+        */
+       public boolean markFree() {
+               if (isEmpty()) {
+                       state = TaskSlotState.FREE;
+                       this.jobId = null;
+                       this.allocationId = null;
+
+                       return true;
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Mark this slot as releasing. A slot can always be marked as 
releasing.
+        *
+        * @return True
+        */
+       public boolean markReleasing() {
+               state = TaskSlotState.RELEASING;
+               return true;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
new file mode 100644
index 0000000..e3ba903
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotState.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+/**
+ * Internal task slot state
+ */
+enum TaskSlotState {
+       ACTIVE, // Slot is in active use by a job manager responsible for a job
+       ALLOCATED, // Slot has been allocated for a job but not yet given to a 
job manager
+       RELEASING, // Slot is not empty but tasks are failed. Upon removal of 
all tasks, it will be released
+       FREE // Slot is free
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
new file mode 100644
index 0000000..42cb919
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TaskSlotTable.java
@@ -0,0 +1,682 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.UUID;
+
+/**
+ * Container for multiple {@link TaskSlot} instances. Additionally, it 
maintains multiple indices
+ * for faster access to tasks and sets of allocated slots.
+ * <p>
+ * The task slot table automatically registers timeouts for allocated slots 
which cannot be assigned
+ * to a job manager.
+ * <p>
+ * Before the task slot table can be used, it must be started via the {@link 
#start} method.
+ */
+public class TaskSlotTable implements TimeoutListener<AllocationID> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(TaskSlotTable.class);
+
+       /** Timer service used to time out allocated slots */
+       private final TimerService<AllocationID> timerService;
+
+       /** The list of all task slots */
+       private final List<TaskSlot> taskSlots;
+
+       /** Mapping from allocation id to task slot */
+       private final Map<AllocationID, TaskSlot> allocationIDTaskSlotMap;
+
+       /** Mapping from execution attempt id to task and task slot */
+       private final Map<ExecutionAttemptID, TaskSlotMapping> taskSlotMappings;
+
+       /** Mapping from job id to allocated slots for a job */
+       private final Map<JobID, Set<AllocationID>> slotsPerJob;
+
+       /** Interface for slot actions, such as freeing them or timing them out 
*/
+       private SlotActions slotActions;
+
+       /** The timeout for allocated slots */
+       private Time slotTimeout;
+
+       /** Whether the table has been started */
+       private boolean started;
+
+       public TaskSlotTable(
+               final Collection<ResourceProfile> resourceProfiles,
+               final TimerService<AllocationID> timerService) {
+
+               int numberSlots = resourceProfiles.size();
+
+               Preconditions.checkArgument(0 < numberSlots, "The number of 
task slots must be greater than 0.");
+
+               this.timerService = Preconditions.checkNotNull(timerService);
+
+               taskSlots = Arrays.asList(new TaskSlot[numberSlots]);
+
+               int index = 0;
+
+               // create the task slots for the given resource profiles
+               for (ResourceProfile resourceProfile: resourceProfiles) {
+                       taskSlots.set(index, new TaskSlot(index, 
resourceProfile));
+                       ++index;
+               }
+
+               allocationIDTaskSlotMap = new HashMap<>(numberSlots);
+
+               taskSlotMappings = new HashMap<>(4 * numberSlots);
+
+               slotsPerJob = new HashMap<>(4);
+
+               slotActions = null;
+               slotTimeout = null;
+               started = false;
+       }
+
+       /**
+        * Start the task slot table with the given slot actions and slot 
timeout value.
+        *
+        * @param initialSlotActions to use for slot actions
+        * @param initialSlotTimeout to use for slot timeouts
+        */
+       public void start(SlotActions initialSlotActions, Time 
initialSlotTimeout) {
+               this.slotActions = 
Preconditions.checkNotNull(initialSlotActions);
+               this.slotTimeout = 
Preconditions.checkNotNull(initialSlotTimeout);
+
+               timerService.start(this);
+
+               started = true;
+       }
+
+       /**
+        * Stop the task slot table.
+        */
+       public void stop() {
+               started = false;
+               timerService.stop();
+               slotTimeout = null;
+               slotActions = null;
+       }
+
+       // ---------------------------------------------------------------------
+       // Slot methods
+       // ---------------------------------------------------------------------
+
+       /**
+        * Allocate the slot with the given index for the given job and 
allocation id. Returns true if
+        * the slot could be allocated. Otherwise it returns false;
+        *
+        * @param index of the task slot to allocate
+        * @param jobId to allocate the task slot for
+        * @param allocationId identifying the allocation
+        * @return True if the task slot could be allocated; otherwise false
+        */
+       public boolean allocateSlot(int index, JobID jobId, AllocationID 
allocationId) {
+               checkInit();
+
+               TaskSlot taskSlot = taskSlots.get(index);
+
+               boolean result = taskSlot.allocate(jobId, allocationId);
+
+               if (result) {
+                       // update the alloction id to task slot map
+                       allocationIDTaskSlotMap.put(allocationId, taskSlot);
+
+                       // register a timeout for this slot since it's in state 
allocated
+                       timerService.registerTimeout(allocationId, 
slotTimeout.getSize(), slotTimeout.getUnit());
+
+                       // add this slot to the set of job slots
+                       Set<AllocationID> slots = slotsPerJob.get(jobId);
+
+                       if (slots == null) {
+                               slots = new HashSet<>(4);
+                               slotsPerJob.put(jobId, slots);
+                       }
+
+                       slots.add(allocationId);
+               }
+
+               return result;
+       }
+
+       /**
+        * Marks the slot under the given allocation id as active. If the slot 
could not be found, then
+        * a {@link SlotNotFoundException} is thrown.
+        *
+        * @param allocationId to identify the task slot to mark as active
+        * @throws SlotNotFoundException if the slot could not be found for the 
given allocation id
+        * @return True if the slot could be marked active
+        */
+       public boolean markSlotActive(AllocationID allocationId) throws 
SlotNotFoundException {
+               checkInit();
+
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       if (taskSlot.markActive()) {
+                               // unregister a potential timeout
+                               timerService.unregisterTimeout(allocationId);
+
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
+
+       /**
+        * Marks the slot under the given allocation id as inactive. If the 
slot could not be found,
+        * then a {@link SlotNotFoundException} is thrown.
+        *
+        * @param allocationId to identify the task slot to mark as inactive
+        * @throws SlotNotFoundException if the slot could not be found for the 
given allocation id
+        * @return True if the slot could be marked inactive
+        */
+       public boolean markSlotInactive(AllocationID allocationId) throws 
SlotNotFoundException {
+               checkInit();
+
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       if (taskSlot.markInactive()) {
+                               // register a timeout to free the slot
+                               timerService.registerTimeout(allocationId, 
slotTimeout.getSize(), slotTimeout.getUnit());
+
+                               return true;
+                       } else {
+                               return false;
+                       }
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
+
+       /**
+        * Try to free the slot. If the slot is empty it will set the state of 
the task slot to free
+        * and return its index. If the slot is not empty, then it will set the 
state of the task slot
+        * to releasing, fail all tasks and return -1.
+        *
+        * @param allocationId identifying the task slot to be freed
+        * @throws SlotNotFoundException if there is not task slot for the 
given allocation id
+        * @return Index of the freed slot if the slot could be freed; 
otherwise -1
+        */
+       public int freeSlot(AllocationID allocationId) throws 
SlotNotFoundException {
+               return freeSlot(allocationId, new Exception("The task slot of 
this task is being freed."));
+       }
+
+       /**
+        * Tries to free the slot. If the slot is empty it will set the state 
of the task slot to free
+        * and return its index. If the slot is not empty, then it will set the 
state of the task slot
+        * to releasing, fail all tasks and return -1.
+        *
+        * @param allocationId identifying the task slot to be freed
+        * @param cause to fail the tasks with if slot is not empty
+        * @throws SlotNotFoundException if there is not task slot for the 
given allocation id
+        * @return Index of the freed slot if the slot could be freed; 
otherwise -1
+        */
+       public int freeSlot(AllocationID allocationId, Throwable cause) throws 
SlotNotFoundException {
+               checkInit();
+
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       LOG.info("Free slot {}.", allocationId, cause);
+
+                       final JobID jobId = taskSlot.getJobId();
+
+                       if (taskSlot.markFree()) {
+                               // remove the allocation id to task slot mapping
+                               allocationIDTaskSlotMap.remove(allocationId);
+
+                               // unregister a potential timeout
+                               timerService.unregisterTimeout(allocationId);
+
+                               Set<AllocationID> slots = 
slotsPerJob.get(jobId);
+
+                               if (slots == null) {
+                                       throw new IllegalStateException("There 
are no more slots allocated for the job " + jobId +
+                                               ". This indicates a programming 
bug.");
+                               }
+
+                               slots.remove(allocationId);
+
+                               if (slots.isEmpty()) {
+                                       slotsPerJob.remove(jobId);
+                               }
+
+                               return taskSlot.getIndex();
+                       } else {
+                               // we couldn't free the task slot because it 
still contains task, fail the tasks
+                               // and set the slot state to releasing so that 
it gets eventually freed
+                               taskSlot.markReleasing();
+
+                               Iterator<Task> taskIterator = 
taskSlot.getTasks();
+
+                               while (taskIterator.hasNext()) {
+                                       
taskIterator.next().failExternally(cause);
+                               }
+
+                               return -1;
+                       }
+               } else {
+                       throw new SlotNotFoundException(allocationId);
+               }
+       }
+
+       /**
+        * Check whether the timeout with ticket is valid for the given 
allocation id.
+        *
+        * @param allocationId to check against
+        * @param ticket of the timeout
+        * @return True if the timeout is valid; otherwise false
+        */
+       public boolean isValidTimeout(AllocationID allocationId, UUID ticket) {
+               checkInit();
+
+               return timerService.isValid(allocationId, ticket);
+       }
+
+       /**
+        * Check whether the slot for the given index is allocated for the 
given job and allocation id.
+        *
+        * @param index of the task slot
+        * @param jobId for which the task slot should be allocated
+        * @param allocationId which should match the task slot's allocation id
+        * @return True if the given task slot is allocated for the given job 
and allocation id
+        */
+       public boolean isAllocated(int index, JobID jobId, AllocationID 
allocationId) {
+               checkInit();
+
+               TaskSlot taskSlot = taskSlots.get(index);
+
+               return taskSlot.isAllocated(jobId, allocationId);
+       }
+
+       /**
+        * Check whether there exists an active slot for the given job and 
allocation id.
+        *
+        * @param jobId of the allocated slot
+        * @param allocationId identifying the allocation
+        * @return True if there exists a task slot which is active for the 
given job and allocation id.
+        */
+       public boolean existActiveSlot(JobID jobId, AllocationID allocationId) {
+               TaskSlot taskSlot = getTaskSlot(allocationId);
+
+               if (taskSlot != null) {
+                       return taskSlot.isActive(jobId, allocationId);
+               } else {
+                       return false;
+               }
+       }
+
+       /**
+        * Check whether the task slot with the given index is free.
+        *
+        * @param index of the task slot
+        * @return True if the task slot is free; otherwise false
+        */
+       public boolean isSlotFree(int index) {
+               TaskSlot taskSlot = taskSlots.get(index);
+
+               return taskSlot.isFree();
+       }
+
+       /**
+        * Check whether the job has allocated (not active) slots.
+        *
+        * @param jobId for which to check for allocated slots
+        * @return True if there are allocated slots for the given job id.
+        */
+       public boolean hasAllocatedSlots(JobID jobId) {
+               return getAllocatedSlots(jobId).hasNext();
+       }
+
+       /**
+        * Return an iterator of allocated slots (their allocation ids) for the 
given job id.
+        *
+        * @param jobId for which to return the allocated slots
+        * @return Iterator of allocation ids of allocated slots.
+        */
+       public Iterator<AllocationID> getAllocatedSlots(JobID jobId) {
+               return new AllocationIDIterator(jobId, TaskSlotState.ALLOCATED);
+       }
+
+       /**
+        * Return an iterator of active slots (their application ids) for the 
given job id.
+        *
+        * @param jobId for which to return the active slots
+        * @return Iterator of allocation ids of active slots
+        */
+       public Iterator<AllocationID> getActiveSlots(JobID jobId) {
+               return new AllocationIDIterator(jobId, TaskSlotState.ACTIVE);
+       }
+
+       // ---------------------------------------------------------------------
+       // Task methods
+       // ---------------------------------------------------------------------
+
+       /**
+        * Add the given task to the slot identified by the task's allocation 
id.
+        *
+        * @param task to add to the task slot with the respective allocation id
+        * @throws SlotNotFoundException if there was no slot for the given 
allocation id
+        * @throws SlotNotActiveException if there was no slot active for 
task's job and allocation id
+        * @return True if the task could be added to the task slot; otherwise 
false
+        */
+       public boolean addTask(Task task) throws SlotNotFoundException, 
SlotNotActiveException {
+               Preconditions.checkNotNull(task);
+
+               TaskSlot taskSlot = getTaskSlot(task.getAllocationId());
+
+               if (taskSlot != null) {
+                       if (taskSlot.isActive(task.getJobID(), 
task.getAllocationId())) {
+                               if (taskSlot.add(task)) {
+                                       
taskSlotMappings.put(task.getExecutionId(), new TaskSlotMapping(task, 
taskSlot));
+
+                                       return true;
+                               } else {
+                                       return false;
+                               }
+                       } else {
+                               throw new 
SlotNotActiveException(task.getJobID(), task.getAllocationId());
+                       }
+               } else {
+                       throw new 
SlotNotFoundException(taskSlot.getAllocationId());
+               }
+       }
+
+       /**
+        * Remove the task with the given execution attempt id from its task 
slot. If the owning task
+        * slot is in state releasing and empty after removing the task, the 
slot is freed via the
+        * slot actions.
+        *
+        * @param executionAttemptID identifying the task to remove
+        * @return The removed task if there is any for the given execution 
attempt id; otherwise null
+        */
+       public Task removeTask(ExecutionAttemptID executionAttemptID) {
+               TaskSlotMapping taskSlotMapping = 
taskSlotMappings.remove(executionAttemptID);
+
+               if (taskSlotMapping != null) {
+                       Task task = taskSlotMapping.getTask();
+                       TaskSlot taskSlot = taskSlotMapping.getTaskSlot();
+
+                       taskSlot.remove(task.getExecutionId());
+
+                       if (taskSlot.isReleasing() && taskSlot.isEmpty()) {
+                               
slotActions.freeSlot(taskSlot.getAllocationId());
+                       }
+
+                       return task;
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Get the task for the given execution attempt id. If none could be 
found, then return null.
+        *
+        * @param executionAttemptID identifying the requested task
+        * @return The task for the given execution attempt id if it exist; 
otherwise null
+        */
+       public Task getTask(ExecutionAttemptID executionAttemptID) {
+               TaskSlotMapping taskSlotMapping = 
taskSlotMappings.get(executionAttemptID);
+
+               if (taskSlotMapping != null) {
+                       return taskSlotMapping.getTask();
+               } else {
+                       return null;
+               }
+       }
+
+       /**
+        * Return an iterator over all tasks for a given job.
+        *
+        * @param jobId identifying the job of the requested tasks
+        * @return Iterator over all task for a given job
+        */
+       public Iterator<Task> getTasks(JobID jobId) {
+               return new TaskIterator(jobId);
+       }
+
+       // ---------------------------------------------------------------------
+       // TimeoutListener methods
+       // ---------------------------------------------------------------------
+
+       @Override
+       public void notifyTimeout(AllocationID key, UUID ticket) {
+               if (slotActions != null) {
+                       slotActions.timeoutSlot(key, ticket);
+               }
+       }
+
+       // ---------------------------------------------------------------------
+       // Internal methods
+       // ---------------------------------------------------------------------
+
+       private TaskSlot getTaskSlot(AllocationID allocationId) {
+               Preconditions.checkNotNull(allocationId);
+
+               TaskSlot taskSlot = allocationIDTaskSlotMap.get(allocationId);
+
+               return taskSlot;
+       }
+
+       private void checkInit() {
+               Preconditions.checkState(started, "The " + 
TaskSlotTable.class.getSimpleName() + " has to be started.");
+       }
+
+       // ---------------------------------------------------------------------
+       // Static utility classes
+       // ---------------------------------------------------------------------
+
+       /**
+        * Mapping class between a {@link Task} and a {@link TaskSlot}.
+        */
+       private static final class TaskSlotMapping {
+               private final Task task;
+               private final TaskSlot taskSlot;
+
+
+               private TaskSlotMapping(Task task, TaskSlot taskSlot) {
+                       this.task = Preconditions.checkNotNull(task);
+                       this.taskSlot = Preconditions.checkNotNull(taskSlot);
+               }
+
+               public Task getTask() {
+                       return task;
+               }
+
+               public TaskSlot getTaskSlot() {
+                       return taskSlot;
+               }
+       }
+
+       /**
+        * Iterator over {@link AllocationID} of the {@link TaskSlot} of a 
given job. Additionally,
+        * the task slots identified by the allocation ids are in the given 
state.
+        */
+       private final class AllocationIDIterator implements 
Iterator<AllocationID> {
+               private final Iterator<TaskSlot> iterator;
+
+               private AllocationIDIterator(JobID jobId, TaskSlotState state) {
+                               iterator = new TaskSlotIterator(jobId, state);
+               }
+
+               @Override
+               public boolean hasNext() {
+                       return iterator.hasNext();
+               }
+
+               @Override
+               public AllocationID next() {
+                       try {
+                               return iterator.next().getAllocationId();
+                       } catch (NoSuchElementException e) {
+                               throw new NoSuchElementException("No more 
allocation ids.");
+                       }
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("Cannot remove 
allocation ids via this iterator.");
+               }
+       }
+
+       /**
+        * Iterator over {@link TaskSlot} which fulfill a given state condition 
and belong to the given
+        * job.
+        */
+       private final class TaskSlotIterator implements Iterator<TaskSlot> {
+               private final Iterator<AllocationID> allSlots;
+               private final TaskSlotState state;
+
+               private TaskSlot currentSlot;
+
+               private TaskSlotIterator(JobID jobId, TaskSlotState state) {
+
+                       Set<AllocationID> allocationIds = 
slotsPerJob.get(jobId);
+
+                       if (allocationIds == null || allocationIds.isEmpty()) {
+                               allSlots = Collections.emptyIterator();
+                       } else {
+                               allSlots = allocationIds.iterator();
+                       }
+
+                       this.state = Preconditions.checkNotNull(state);
+
+                       this.currentSlot = null;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       while (currentSlot == null && allSlots.hasNext()) {
+                               AllocationID tempSlot = allSlots.next();
+
+                               TaskSlot taskSlot = getTaskSlot(tempSlot);
+
+                               if (taskSlot != null && taskSlot.getState() == 
state) {
+                                       currentSlot = taskSlot;
+                               }
+                       }
+
+                       return currentSlot != null;
+               }
+
+               @Override
+               public TaskSlot next() {
+                       if (currentSlot != null) {
+                               TaskSlot result = currentSlot;
+
+                               currentSlot = null;
+
+                               return result;
+                       } else {
+                               while (true) {
+                                       AllocationID tempSlot;
+
+                                       try {
+                                               tempSlot = allSlots.next();
+                                       } catch (NoSuchElementException e) {
+                                               throw new 
NoSuchElementException("No more task slots.");
+                                       }
+
+                                       TaskSlot taskSlot = 
getTaskSlot(tempSlot);
+
+                                       if (taskSlot != null && 
taskSlot.getState() == state) {
+                                               return taskSlot;
+                                       }
+                               }
+                       }
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("Cannot remove 
task slots via this iterator.");
+               }
+       }
+
+       /**
+        * Iterator over all {@link Task} for a given job
+        */
+       private final class TaskIterator implements Iterator<Task> {
+               private final Iterator<TaskSlot> taskSlotIterator;
+
+               private Iterator<Task> currentTasks;
+
+               private TaskIterator(JobID jobId) {
+                       this.taskSlotIterator = new TaskSlotIterator(jobId, 
TaskSlotState.ACTIVE);
+
+                       this.currentTasks = null;
+               }
+
+               @Override
+               public boolean hasNext() {
+                       while ((currentTasks == null || 
!currentTasks.hasNext()) && taskSlotIterator.hasNext()) {
+                               TaskSlot taskSlot = taskSlotIterator.next();
+
+                               currentTasks = taskSlot.getTasks();
+                       }
+
+                       return (currentTasks != null && currentTasks.hasNext());
+               }
+
+               @Override
+               public Task next() {
+                       while ((currentTasks == null || 
!currentTasks.hasNext())) {
+                               TaskSlot taskSlot;
+
+                               try {
+                                       taskSlot = taskSlotIterator.next();
+                               } catch (NoSuchElementException e) {
+                                       throw new NoSuchElementException("No 
more tasks.");
+                               }
+
+                               currentTasks = taskSlot.getTasks();
+                       }
+
+                       return currentTasks.next();
+               }
+
+               @Override
+               public void remove() {
+                       throw new UnsupportedOperationException("Cannot remove 
tasks via this iterator.");
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
new file mode 100644
index 0000000..3e75d74
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimeoutListener.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import java.util.UUID;
+
+/**
+ * Listener for timeout events by the {@link TimerService}.
+ * @param <K> Type of the timeout key
+ */
+public interface TimeoutListener<K> {
+
+       /**
+        * Notify the listener about the timeout for an event identified by 
key. Additionally the method
+        * is called with the timeout ticket which allows to identify outdated 
timeout events.
+        *
+        * @param key identifying the timed out event
+        * @param ticket used to check whether the timeout is still valid
+        */
+       void notifyTimeout(K key, UUID ticket);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
new file mode 100644
index 0000000..e28e801
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/slot/TimerService.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskexecutor.slot;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Service to register timeouts for a given key. The timeouts are identified 
by a ticket so that
+ * newly registered timeouts for the same key can be distinguished from older 
timeouts.
+ *
+ * @param <K> Type of the key
+ */
+public class TimerService<K> {
+
+       /** Executor service for the scheduled timeouts */
+       private final ScheduledExecutorService scheduledExecutorService;
+
+       /** Map of currently active timeouts */
+       private final Map<K, Timeout<K>> timeouts;
+
+       /** Listener which is notified about occurring timeouts */
+       private TimeoutListener<K> timeoutListener;
+
+       public TimerService(final ScheduledExecutorService 
scheduledExecutorService) {
+               this.scheduledExecutorService = 
Preconditions.checkNotNull(scheduledExecutorService);
+
+               this.timeouts = new HashMap<>(16);
+               this.timeoutListener = null;
+       }
+
+       public void start(TimeoutListener<K> initialTimeoutListener) {
+               // sanity check; We only allow to assign a timeout listener once
+               
Preconditions.checkState(!scheduledExecutorService.isShutdown());
+               Preconditions.checkState(timeoutListener == null);
+
+               this.timeoutListener = 
Preconditions.checkNotNull(initialTimeoutListener);
+       }
+
+       public void stop() {
+               for (K key: timeouts.keySet()) {
+                       unregisterTimeout(key);
+               }
+
+               timeoutListener = null;
+
+               scheduledExecutorService.shutdown();
+       }
+
+       /**
+        * Register a timeout for the given key which shall occur in the given 
delay.
+        *
+        * @param key for which to register the timeout
+        * @param delay until the timeout
+        * @param unit of the timeout delay
+        */
+       public void registerTimeout(final K key, final long delay, final 
TimeUnit unit) {
+               Preconditions.checkState(timeoutListener != null, "The " + 
getClass().getSimpleName() +
+                       " has not been started.");
+
+               if (timeouts.containsKey(key)) {
+                       unregisterTimeout(key);
+               }
+
+               timeouts.put(key, new Timeout<>(timeoutListener, key, delay, 
unit, scheduledExecutorService));
+       }
+
+       /**
+        * Unregister the timeout for the given key.
+        *
+        * @param key for which to unregister the timeout
+        */
+       public void unregisterTimeout(K key) {
+               Timeout<K> timeout = timeouts.remove(key);
+
+               if (timeout != null) {
+                       timeout.cancel();
+               }
+       }
+
+       /**
+        * Check whether the timeout for the given key and ticket is still 
valid (not yet unregistered
+        * and not yet overwritten).
+        *
+        * @param key for which to check the timeout
+        * @param ticket of the timeout
+        * @return True if the timeout ticket is still valid; otherwise false
+        */
+       public boolean isValid(K key, UUID ticket) {
+               if (timeouts.containsKey(key)) {
+                       Timeout<K> timeout = timeouts.get(key);
+
+                       return timeout.getTicket().equals(ticket);
+               } else {
+                       return false;
+               }
+       }
+
+       // ---------------------------------------------------------------------
+       // Static utility classes
+       // ---------------------------------------------------------------------
+
+       private static final class Timeout<K> implements Runnable {
+
+               private final TimeoutListener<K> timeoutListener;
+               private final K key;
+               private final ScheduledFuture<?> scheduledTimeout;
+               private final UUID ticket;
+
+               Timeout(
+                       final TimeoutListener<K> timeoutListener,
+                       final K key,
+                       final long delay,
+                       final TimeUnit unit,
+                       final ScheduledExecutorService 
scheduledExecutorService) {
+
+                       Preconditions.checkNotNull(scheduledExecutorService);
+
+                       this.timeoutListener = 
Preconditions.checkNotNull(timeoutListener);
+                       this.key = Preconditions.checkNotNull(key);
+                       this.scheduledTimeout = 
scheduledExecutorService.schedule(this, delay, unit);
+                       this.ticket = UUID.randomUUID();
+               }
+
+               UUID getTicket() {
+                       return ticket;
+               }
+
+               void cancel() {
+                       scheduledTimeout.cancel(true);
+               }
+
+               @Override
+               public void run() {
+                       timeoutListener.notifyTimeout(key, ticket);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 977e563..b67737d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -133,7 +133,7 @@ public class Task implements Runnable, TaskActions {
        private final ExecutionAttemptID executionId;
 
        /** ID which identifies the slot in which the task is supposed to run */
-       private final AllocationID allocationID;
+       private final AllocationID allocationId;
 
        /** TaskInfo object for this task */
        private final TaskInfo taskInfo;
@@ -278,7 +278,7 @@ public class Task implements Runnable, TaskActions {
                this.jobId = checkNotNull(tdd.getJobID());
                this.vertexId = checkNotNull(tdd.getVertexID());
                this.executionId  = checkNotNull(tdd.getExecutionId());
-               this.allocationID = checkNotNull(tdd.getAllocationID());
+               this.allocationId = checkNotNull(tdd.getAllocationID());
                this.taskNameWithSubtask = taskInfo.getTaskNameWithSubtasks();
                this.jobConfiguration = checkNotNull(tdd.getJobConfiguration());
                this.taskConfiguration = 
checkNotNull(tdd.getTaskConfiguration());
@@ -385,8 +385,8 @@ public class Task implements Runnable, TaskActions {
                return executionId;
        }
 
-       public AllocationID getAllocationID() {
-               return allocationID;
+       public AllocationID getAllocationId() {
+               return allocationId;
        }
 
        public TaskInfo getTaskInfo() {

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
index af8aa69..97f42b1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartialConsumePipelinedResultTest.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.reader.BufferReader;

http://git-wip-us.apache.org/repos/asf/flink/blob/9da76dcf/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index ecbd9b5..baae251 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -39,6 +39,7 @@ import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequ
 import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
@@ -83,6 +84,7 @@ public class TaskExecutorTest extends TestLogger {
                                mock(TaskManagerMetricGroup.class),
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
+                               mock(TaskSlotTable.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -139,6 +141,7 @@ public class TaskExecutorTest extends TestLogger {
                                mock(TaskManagerMetricGroup.class),
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
+                               mock(TaskSlotTable.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();
@@ -211,6 +214,7 @@ public class TaskExecutorTest extends TestLogger {
                                mock(TaskManagerMetricGroup.class),
                                mock(BroadcastVariableManager.class),
                                mock(FileCache.class),
+                               mock(TaskSlotTable.class),
                                mock(FatalErrorHandler.class));
 
                        taskManager.start();

Reply via email to