[FLINK-4987] Harden SlotPool on JobMaster

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1ba9f11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1ba9f11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1ba9f11

Branch: refs/heads/master
Commit: a1ba9f1126270d53168394211a99e354aa2cf20d
Parents: 8730e20
Author: Stephan Ewen <[email protected]>
Authored: Fri Oct 21 16:51:34 2016 +0200
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../clusterframework/types/ResourceProfile.java |   18 +-
 .../org/apache/flink/runtime/instance/Slot.java |   16 +
 .../flink/runtime/instance/SlotDescriptor.java  |  162 ---
 .../apache/flink/runtime/instance/SlotPool.java | 1026 +++++++++++-------
 .../flink/runtime/instance/SlotPoolGateway.java |   95 ++
 .../runtime/jobmanager/scheduler/Locality.java  |   26 +-
 .../scheduler/NoResourceAvailableException.java |    6 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |   58 +-
 .../jobmanager/slots/PooledSlotProvider.java    |   73 --
 .../jobmanager/slots/SlotAndLocality.java       |   55 +
 .../flink/runtime/jobmaster/JobMaster.java      |  102 +-
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |   18 +-
 .../apache/flink/runtime/util/clock/Clock.java  |   40 +
 .../flink/runtime/util/clock/SystemClock.java   |   57 +
 .../types/ResourceProfileTest.java              |    5 +
 .../runtime/instance/AllocatedSlotsTest.java    |  270 ++---
 .../runtime/instance/AvailableSlotsTest.java    |  247 +++--
 .../flink/runtime/instance/SlotPoolTest.java    |  596 +++++-----
 .../runtime/minicluster/MiniClusterITCase.java  |   13 +-
 19 files changed, 1650 insertions(+), 1233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
index 7a25de1..ddc7547 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java
@@ -18,14 +18,21 @@
 
 package org.apache.flink.runtime.clusterframework.types;
 
+import javax.annotation.Nonnull;
 import java.io.Serializable;
 
 /**
  * Describe the resource profile of the slot, either when requiring or 
offering it. The profile can be
  * checked whether it can match another profile's requirement, and furthermore 
we may calculate a matching
  * score to decide which profile we should choose when we have lots of 
candidate slots.
+ * 
+ * <p>Resource Profiles have a total ordering, defined by comparing these 
fields in sequence:
+ * <ol>
+ *     <li>Memory Size</li>
+ *     <li>CPU cores</li>
+ * </ol>
  */
-public class ResourceProfile implements Serializable {
+public class ResourceProfile implements Serializable, 
Comparable<ResourceProfile> {
 
        private static final long serialVersionUID = 1L;
 
@@ -90,11 +97,18 @@ public class ResourceProfile implements Serializable {
                return cpuCores >= required.getCpuCores() && memoryInMB >= 
required.getMemoryInMB();
        }
 
+       @Override
+       public int compareTo(@Nonnull ResourceProfile other) {
+               int cmp1 = Long.compare(this.memoryInMB, other.memoryInMB);
+               int cmp2 = Double.compare(this.cpuCores, other.cpuCores);
+               return (cmp1 != 0) ? cmp1 : cmp2; 
+       }
+
        // 
------------------------------------------------------------------------
 
        @Override
        public int hashCode() {
-               long cpuBits = Double.doubleToLongBits(cpuCores);
+               long cpuBits = Double.doubleToRawLongBits(cpuCores);
                return (int) (cpuBits ^ (cpuBits >>> 32) ^ memoryInMB ^ 
(memoryInMB >> 32));
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
index 8f8b897..d6d8f12 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/Slot.java
@@ -350,6 +350,22 @@ public abstract class Slot {
        //  Utilities
        // 
--------------------------------------------------------------------------------------------
 
+       /**
+        * Slots must always has based on reference identity.
+        */
+       @Override
+       public final int hashCode() {
+               return super.hashCode();
+       }
+
+       /**
+        * Slots must always compare on referential equality.
+        */
+       @Override
+       public final boolean equals(Object obj) {
+               return this == obj;
+       }
+
        @Override
        public String toString() {
                return hierarchy() + " - " + getTaskManagerLocation() + " - " + 
getStateName(status);

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
deleted file mode 100644
index 47ce422..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotDescriptor.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.instance;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * The description of slots, TaskManagers offer one or more task slots, which 
define a slice of
- * their resources. This description will contain some static information 
about the slot, such
- * as the location and numeric id of the slot, rpc gateway to communicate with 
the TaskManager which
- * owns the slot.
- */
-public class SlotDescriptor {
-
-       /** The ID of the job this slice belongs to. */
-       private final JobID jobID;
-
-       /** The location information of the TaskManager to which this slot 
belongs */
-       private final TaskManagerLocation taskManagerLocation;
-
-       /** The number of the slot on which the task is deployed */
-       private final int slotNumber;
-
-       /** The resource profile of the slot provides */
-       private final ResourceProfile resourceProfile;
-
-       /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
-       private final TaskManagerGateway taskManagerGateway;
-
-       public SlotDescriptor(
-               final JobID jobID,
-               final TaskManagerLocation location,
-               final int slotNumber,
-               final ResourceProfile resourceProfile,
-               final TaskManagerGateway taskManagerGateway)
-       {
-               this.jobID = checkNotNull(jobID);
-               this.taskManagerLocation = checkNotNull(location);
-               this.slotNumber = slotNumber;
-               this.resourceProfile = checkNotNull(resourceProfile);
-               this.taskManagerGateway = checkNotNull(taskManagerGateway);
-       }
-
-       public SlotDescriptor(final SlotDescriptor other) {
-               this.jobID = other.jobID;
-               this.taskManagerLocation = other.taskManagerLocation;
-               this.slotNumber = other.slotNumber;
-               this.resourceProfile = other.resourceProfile;
-               this.taskManagerGateway = other.taskManagerGateway;
-       }
-       
-       // TODO - temporary workaround until we have the SlotDesriptor in the 
Slot
-       public SlotDescriptor(final Slot slot) {
-               this.jobID = slot.getJobID();
-               this.taskManagerLocation = slot.getTaskManagerLocation();
-               this.slotNumber = slot.getRootSlotNumber();
-               this.resourceProfile = new ResourceProfile(0, 0);
-               this.taskManagerGateway = slot.getTaskManagerGateway();
-       }
-
-       /**
-        * Returns the ID of the job this allocated slot belongs to.
-        *
-        * @return the ID of the job this allocated slot belongs to
-        */
-       public JobID getJobID() {
-               return jobID;
-       }
-
-       /**
-        * Gets the number of the slot.
-        *
-        * @return The number of the slot on the TaskManager.
-        */
-       public int getSlotNumber() {
-               return slotNumber;
-       }
-
-       /**
-        * Gets the resource profile of the slot.
-        *
-        * @return The resource profile of the slot.
-        */
-       public ResourceProfile getResourceProfile() {
-               return resourceProfile;
-       }
-
-       /**
-        * Gets the location info of the TaskManager that offers this slot.
-        *
-        * @return The location info of the TaskManager that offers this slot
-        */
-       public TaskManagerLocation getTaskManagerLocation() {
-               return taskManagerLocation;
-       }
-
-       /**
-        * Gets the actor gateway that can be used to send messages to the 
TaskManager.
-        * <p>
-        * This method should be removed once the new interface-based RPC 
abstraction is in place
-        *
-        * @return The actor gateway that can be used to send messages to the 
TaskManager.
-        */
-       public TaskManagerGateway getTaskManagerGateway() {
-               return taskManagerGateway;
-       }
-
-       @Override
-       public boolean equals(Object o) {
-               if (this == o) {
-                       return true;
-               }
-               if (o == null || getClass() != o.getClass()) {
-                       return false;
-               }
-
-               SlotDescriptor that = (SlotDescriptor) o;
-
-               if (slotNumber != that.slotNumber) {
-                       return false;
-               }
-               if (!jobID.equals(that.jobID)) {
-                       return false;
-               }
-               return taskManagerLocation.equals(that.taskManagerLocation);
-
-       }
-
-       @Override
-       public int hashCode() {
-               int result = jobID.hashCode();
-               result = 31 * result + taskManagerLocation.hashCode();
-               result = 31 * result + slotNumber;
-               return result;
-       }
-
-       @Override
-       public String toString() {
-               return taskManagerLocation + " - " + slotNumber;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 44df29b..5a3a321 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,25 +25,41 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
 import org.apache.flink.runtime.jobmanager.slots.SlotOwner;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRegistered;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestRejected;
 import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.util.clock.Clock;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -58,18 +74,33 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  * <p>
  * All the allocation or the slot offering will be identified by self 
generated AllocationID, we will use it to
  * eliminate ambiguities.
+ * 
+ * TODO : Make pending requests location preference aware
+ * TODO : Make pass location preferences to ResourceManager when sending a 
slot request
  */
-public class SlotPool implements SlotOwner {
+public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
+
+       /** The log for the pool - shared also with the internal classes */
+       static final Logger LOG = LoggerFactory.getLogger(SlotPool.class);
+
+       // 
------------------------------------------------------------------------
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(SlotPool.class);
+       private static final Time DEFAULT_SLOT_REQUEST_TIMEOUT = 
Time.minutes(5);
+
+       private static final Time DEFAULT_RM_ALLOCATION_TIMEOUT = 
Time.minutes(10);
+
+       private static final Time DEFAULT_RM_REQUEST_TIMEOUT = Time.seconds(10);
+
+       // 
------------------------------------------------------------------------
 
        private final Object lock = new Object();
 
-       /** The executor which is used to execute futures */
-       private final Executor executor;
+       private final JobID jobId;
 
-       /** All registered resources, slots will be accepted and used only if 
the resource is registered */
-       private final Set<ResourceID> registeredResources;
+       private final ProviderAndOwner providerAndOwner;
+
+       /** All registered TaskManagers, slots will be accepted and used only 
if the resource is registered */
+       private final HashSet<ResourceID> registeredTaskManagers;
 
        /** The book-keeping of all allocated slots */
        private final AllocatedSlots allocatedSlots;
@@ -78,10 +109,15 @@ public class SlotPool implements SlotOwner {
        private final AvailableSlots availableSlots;
 
        /** All pending requests waiting for slots */
-       private final Map<AllocationID, Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>>> pendingRequests;
+       private final HashMap<AllocationID, PendingRequest> pendingRequests;
+
+       /** Timeout for request calls to the ResourceManager */
+       private final Time resourceManagerRequestsTimeout;
 
-       /** Timeout of slot allocation */
-       private final Time timeout;
+       /** Timeout for allocation round trips (RM -> launch TM -> offer slot) 
*/
+       private final Time resourceManagerAllocationTimeout;
+
+       private final Clock clock;
 
        /** the leader id of job manager */
        private UUID jobManagerLeaderId;
@@ -92,177 +128,238 @@ public class SlotPool implements SlotOwner {
        /** The gateway to communicate with resource manager */
        private ResourceManagerGateway resourceManagerGateway;
 
-       public SlotPool(final Executor executor) {
-               this.executor = executor;
-               this.registeredResources = new HashSet<>();
+       // 
------------------------------------------------------------------------
+
+       public SlotPool(RpcService rpcService, JobID jobId) {
+               this(rpcService, jobId, SystemClock.getInstance(),
+                               DEFAULT_SLOT_REQUEST_TIMEOUT, 
DEFAULT_RM_ALLOCATION_TIMEOUT, DEFAULT_RM_REQUEST_TIMEOUT);
+       }
+
+       public SlotPool(
+                       RpcService rpcService,
+                       JobID jobId,
+                       Clock clock,
+                       Time slotRequestTimeout,
+                       Time resourceManagerAllocationTimeout,
+                       Time resourceManagerRequestTimeout) {
+
+               super(rpcService);
+
+               this.jobId = checkNotNull(jobId);
+               this.clock = checkNotNull(clock);
+               this.resourceManagerRequestsTimeout = 
checkNotNull(resourceManagerRequestTimeout);
+               this.resourceManagerAllocationTimeout = 
checkNotNull(resourceManagerAllocationTimeout);
+
+               this.registeredTaskManagers = new HashSet<>();
                this.allocatedSlots = new AllocatedSlots();
                this.availableSlots = new AvailableSlots();
                this.pendingRequests = new HashMap<>();
-               this.timeout = Time.of(5, TimeUnit.SECONDS);
-       }
 
-       public void setJobManagerLeaderId(final UUID jobManagerLeaderId) {
-               this.jobManagerLeaderId = jobManagerLeaderId;
+               this.providerAndOwner = new ProviderAndOwner(getSelf(), 
slotRequestTimeout);
        }
 
        // 
------------------------------------------------------------------------
-       //  Slot Allocation
+       //  Starting and Stopping
        // 
------------------------------------------------------------------------
 
+       @Override
+       public void start() {
+               throw new UnsupportedOperationException("Should never call 
start() without leader ID");
+       }
+
        /**
-        * Try to allocate a simple slot with specified resource profile.
+        * Start the slot pool to accept RPC calls.
         *
-        * @param jobID           The job id which the slot allocated for
-        * @param resourceProfile The needed resource profile
-        * @return The future of allocated simple slot
+        * @param jobManagerLeaderId The necessary leader id for running the 
job.
         */
-       public Future<SimpleSlot> allocateSimpleSlot(final JobID jobID, final 
ResourceProfile resourceProfile) {
-               return allocateSimpleSlot(jobID, resourceProfile, new 
AllocationID());
-       }
+       public void start(UUID jobManagerLeaderId) {
+               this.jobManagerLeaderId = jobManagerLeaderId;
 
+               // TODO - start should not throw an exception
+               try {
+                       super.start();
+               } catch (Exception e) {
+                       throw new RuntimeException("This should never happen", 
e);
+               }
+       }
 
        /**
-        * Try to allocate a simple slot with specified resource profile and 
specified allocation id. It's mainly
-        * for testing purpose since we need to specify whatever allocation id 
we want.
+        * Suspends this pool, meaning it has lost its authority to accept and 
distribute slots.
         */
-       @VisibleForTesting
-       Future<SimpleSlot> allocateSimpleSlot(
-               final JobID jobID,
-               final ResourceProfile resourceProfile,
-               final AllocationID allocationID)
-       {
-               final FlinkCompletableFuture<SlotDescriptor> future = new 
FlinkCompletableFuture<>();
-
-               internalAllocateSlot(jobID, allocationID, resourceProfile, 
future);
-
-               return future.thenApplyAsync(
-                       new ApplyFunction<SlotDescriptor, SimpleSlot>() {
-                               @Override
-                               public SimpleSlot apply(SlotDescriptor 
descriptor) {
-                                       SimpleSlot slot = new SimpleSlot(
-                                                       descriptor.getJobID(), 
SlotPool.this,
-                                                       
descriptor.getTaskManagerLocation(), descriptor.getSlotNumber(),
-                                                       
descriptor.getTaskManagerGateway());
-                                       synchronized (lock) {
-                                               // double validation since we 
are out of the lock protection after the slot is granted
-                                               if 
(registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID()))
 {
-                                                       
LOG.info("Allocation[{}] Allocated simple slot: {} for job {}.", allocationID, 
slot, jobID);
-                                                       
allocatedSlots.add(allocationID, descriptor, slot);
-                                               }
-                                               else {
-                                                       throw new 
RuntimeException("Resource was marked dead asynchronously.");
-                                               }
-                                       }
-                                       return slot;
-                               }
-                       },
-                       executor
-               );
+       @RpcMethod
+       public void suspend() {
+               validateRunsInMainThread();
+
+               // suspend this RPC endpoint
+               ((StartStoppable) getSelf()).stop();
+
+               // do not accept any requests
+               jobManagerLeaderId = null;
+               resourceManagerLeaderId = null;
+               resourceManagerGateway = null;
+
+               // Clear (but not release!) the available slots. The 
TaskManagers should re-register them
+               // at the new leader JobManager/SlotPool
+               availableSlots.clear();
+               allocatedSlots.clear();
+               pendingRequests.clear();
        }
 
+       // 
------------------------------------------------------------------------
+       //  Getting PoolOwner and PoolProvider
+       // 
------------------------------------------------------------------------
 
        /**
-        * Try to allocate a shared slot with specified resource profile.
-        *
-        * @param jobID                  The job id which the slot allocated for
-        * @param resourceProfile        The needed resource profile
-        * @param sharingGroupAssignment The slot sharing group of the vertex
-        * @return The future of allocated shared slot
+        * Gets the slot owner implementation for this pool.
+        * 
+        * <p>This method does not mutate state and can be called directly (no 
RPC indirection)
+        * 
+        * @return The slot owner implementation for this pool.
         */
-       public Future<SharedSlot> allocateSharedSlot(
-               final JobID jobID,
-               final ResourceProfile resourceProfile,
-               final SlotSharingGroupAssignment sharingGroupAssignment)
-       {
-               return allocateSharedSlot(jobID, resourceProfile, 
sharingGroupAssignment, new AllocationID());
+       public SlotOwner getSlotOwner() {
+               return providerAndOwner;
        }
 
        /**
-        * Try to allocate a shared slot with specified resource profile and 
specified allocation id. It's mainly
-        * for testing purpose since we need to specify whatever allocation id 
we want.
+        * Gets the slot provider implementation for this pool.
+        *
+        * <p>This method does not mutate state and can be called directly (no 
RPC indirection)
+        *
+        * @return The slot provider implementation for this pool.
         */
-       @VisibleForTesting
-       Future<SharedSlot> allocateSharedSlot(
-               final JobID jobID,
-               final ResourceProfile resourceProfile,
-               final SlotSharingGroupAssignment sharingGroupAssignment,
-               final AllocationID allocationID)
-       {
-               final FlinkCompletableFuture<SlotDescriptor> future = new 
FlinkCompletableFuture<>();
-
-               internalAllocateSlot(jobID, allocationID, resourceProfile, 
future);
-
-               return future.thenApplyAsync(
-                       new ApplyFunction<SlotDescriptor, SharedSlot>() {
-                               @Override
-                               public SharedSlot apply(SlotDescriptor 
descriptor) {
-                                       SharedSlot slot = new SharedSlot(
-                                                       descriptor.getJobID(), 
SlotPool.this, descriptor.getTaskManagerLocation(),
-                                                       
descriptor.getSlotNumber(), descriptor.getTaskManagerGateway(),
-                                                       sharingGroupAssignment);
-
-                                       synchronized (lock) {
-                                               // double validation since we 
are out of the lock protection after the slot is granted
-                                               if 
(registeredResources.contains(descriptor.getTaskManagerLocation().getResourceID()))
 {
-                                                       
LOG.info("Allocation[{}] Allocated shared slot: {} for job {}.", allocationID, 
slot, jobID);
-                                                       
allocatedSlots.add(allocationID, descriptor, slot);
-                                               }
-                                               else {
-                                                       throw new 
RuntimeException("Resource was marked dead asynchronously.");
-                                               }
-                                       }
-                                       return slot;
-                               }
-                       },
-                       executor
-               );
+       public SlotProvider getSlotProvider() {
+               return providerAndOwner;
        }
 
-       /**
-        * Internally allocate the slot with specified resource profile. We 
will first check whether we have some
-        * free slot which can meet the requirement already and allocate it 
immediately. Otherwise, we will try to
-        * allocation the slot from resource manager.
-        */
-       private void internalAllocateSlot(
-               final JobID jobID,
-               final AllocationID allocationID,
-               final ResourceProfile resourceProfile,
-               final FlinkCompletableFuture<SlotDescriptor> future)
-       {
-               LOG.info("Allocation[{}] Allocating slot with {} for Job {}.", 
allocationID, resourceProfile, jobID);
-
-               synchronized (lock) {
-                       // check whether we have any free slot which can match 
the required resource profile
-                       SlotDescriptor freeSlot = 
availableSlots.poll(resourceProfile);
-                       if (freeSlot != null) {
-                               future.complete(freeSlot);
-                       }
-                       else {
-                               if (resourceManagerGateway != null) {
-                                       LOG.info("Allocation[{}] No available 
slot exists, trying to allocate from resource manager.",
-                                               allocationID);
-                                       SlotRequest slotRequest = new 
SlotRequest(jobID, allocationID, resourceProfile);
-                                       pendingRequests.put(allocationID, new 
Tuple2<>(slotRequest, future));
-                                       
resourceManagerGateway.requestSlot(jobManagerLeaderId, resourceManagerLeaderId, 
slotRequest, timeout)
-                                               .handleAsync(new 
BiFunction<RMSlotRequestReply, Throwable, Void>() {
-                                                       @Override
-                                                       public Void 
apply(RMSlotRequestReply slotRequestReply, Throwable throwable) {
-                                                               if (throwable 
!= null) {
-                                                                       
future.completeExceptionally(
-                                                                               
new Exception("Slot allocation from resource manager failed", throwable));
-                                                               } else if 
(slotRequestReply instanceof RMSlotRequestRejected) {
-                                                                       
future.completeExceptionally(
-                                                                               
new Exception("Slot allocation rejected by resource manager"));
-                                                               }
-                                                               return null;
-                                                       }
-                                               }, executor);
+       // 
------------------------------------------------------------------------
+       //  Resource Manager Connection
+       // 
------------------------------------------------------------------------
+
+       @RpcMethod
+       public void connectToResourceManager(UUID resourceManagerLeaderId, 
ResourceManagerGateway resourceManagerGateway) {
+               this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
+               this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
+       }
+
+       @RpcMethod
+       public void disconnectResourceManager() {
+               this.resourceManagerLeaderId = null;
+               this.resourceManagerGateway = null;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Slot Allocation
+       // 
------------------------------------------------------------------------
+
+       @RpcMethod
+       public Future<SimpleSlot> allocateSlot(
+                       ScheduledUnit task,
+                       ResourceProfile resources,
+                       Iterable<TaskManagerLocation> locationPreferences) {
+
+               return internalAllocateSlot(task, resources, 
locationPreferences);
+       }
+
+       @RpcMethod
+       public void returnAllocatedSlot(Slot slot) {
+               internalReturnAllocatedSlot(slot);
+       }
+
+
+       Future<SimpleSlot> internalAllocateSlot(
+                       ScheduledUnit task,
+                       ResourceProfile resources,
+                       Iterable<TaskManagerLocation> locationPreferences) {
+
+               // (1) do we have a slot available already?
+               SlotAndLocality slotFromPool = availableSlots.poll(resources, 
locationPreferences);
+               if (slotFromPool != null) {
+                       SimpleSlot slot = createSimpleSlot(slotFromPool.slot(), 
slotFromPool.locality());
+                       allocatedSlots.add(slot);
+                       return FlinkCompletableFuture.completed(slot);
+               }
+
+               // (2) no slot available, and no resource manager connection
+               if (resourceManagerGateway == null) {
+                       return FlinkCompletableFuture.completedExceptionally(
+                                       new NoResourceAvailableException("not 
connected to ResourceManager and no slot available"));
+                       
+               }
+
+               // (3) we have a resource manager connection, so let's ask it 
for more resources
+               final FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+               final AllocationID allocationID = new AllocationID();
+
+               LOG.info("Requesting slot with profile {} from resource manager 
(request = {}).", resources, allocationID);
+
+               pendingRequests.put(allocationID, new 
PendingRequest(allocationID, future, resources));
+
+               Future<RMSlotRequestReply> rmResponse = 
resourceManagerGateway.requestSlot(
+                               resourceManagerLeaderId, jobManagerLeaderId,
+                               new SlotRequest(jobId, allocationID, resources),
+                               resourceManagerRequestsTimeout);
+
+               // on success, trigger let the slot pool know
+               rmResponse.thenAcceptAsync(new 
AcceptFunction<RMSlotRequestReply>() {
+                       @Override
+                       public void accept(RMSlotRequestReply reply) {
+                               if (reply.getAllocationID() != null && 
reply.getAllocationID().equals(allocationID)) {
+                                       if (reply instanceof 
RMSlotRequestRegistered) {
+                                               
slotRequestToResourceManagerSuccess(allocationID);
+                                       }
+                                       else if (reply instanceof 
RMSlotRequestRejected) {
+                                               
slotRequestToResourceManagerFailed(allocationID,
+                                                               new 
Exception("ResourceManager rejected slot request"));
+                                       }
+                                       else {
+                                               
slotRequestToResourceManagerFailed(allocationID, 
+                                                               new 
Exception("Unknown ResourceManager response: " + reply));
+                                       }
                                }
                                else {
-                                       LOG.warn("Allocation[{}] Resource 
manager not available right now.", allocationID);
-                                       future.completeExceptionally(new 
Exception("Resource manager not available right now."));
+                                       future.completeExceptionally(new 
Exception(String.format(
+                                                       "Bug: ResourceManager 
response had wrong AllocationID. Request: %s , Response: %s", 
+                                                       allocationID, 
reply.getAllocationID())));
                                }
                        }
+               }, getMainThreadExecutor());
+
+               // on failure, fail the request future
+               rmResponse.exceptionallyAsync(new ApplyFunction<Throwable, 
Void>() {
+
+                       @Override
+                       public Void apply(Throwable failure) {
+                               
slotRequestToResourceManagerFailed(allocationID, failure);
+                               return null;
+                       }
+               }, getMainThreadExecutor());
+
+               return future;
+       }
+
+       private void slotRequestToResourceManagerSuccess(final AllocationID 
allocationID) {
+               // a request is pending from the ResourceManager to a (future) 
TaskManager
+               // we only add the watcher here in case that request times out
+               scheduleRunAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               checkTimeoutSlotAllocation(allocationID);
+                       }
+               }, resourceManagerAllocationTimeout);
+       }
+
+       private void slotRequestToResourceManagerFailed(AllocationID 
allocationID, Throwable failure) {
+               PendingRequest request = pendingRequests.remove(allocationID);
+               if (request != null) {
+                       request.future().completeExceptionally(new 
NoResourceAvailableException(
+                                       "No pooled slot available and request 
to ResourceManager for new slot failed", failure));
+               }
+       }
+
+       private void checkTimeoutSlotAllocation(AllocationID allocationID) {
+               PendingRequest request = pendingRequests.remove(allocationID);
+               if (request != null && !request.future().isDone()) {
+                       request.future().completeExceptionally(new 
TimeoutException("Slot allocation request timed out"));
                }
        }
 
@@ -275,123 +372,123 @@ public class SlotPool implements SlotOwner {
         * slot can be reused by other pending requests if the resource profile 
matches.n
         *
         * @param slot The slot needs to be returned
-        * @return True if the returning slot been accepted
         */
-       @Override
-       public boolean returnAllocatedSlot(Slot slot) {
+       private void internalReturnAllocatedSlot(Slot slot) {
                checkNotNull(slot);
                checkArgument(!slot.isAlive(), "slot is still alive");
-               checkArgument(slot.getOwner() == this, "slot belongs to the 
wrong pool.");
+               checkArgument(slot.getOwner() == providerAndOwner, "slot 
belongs to the wrong pool.");
 
+               // markReleased() is an atomic check-and-set operation, so that 
the slot is guaranteed
+               // to be returned only once
                if (slot.markReleased()) {
-                       synchronized (lock) {
-                               final SlotDescriptor slotDescriptor = 
allocatedSlots.remove(slot);
-                               if (slotDescriptor != null) {
-                                       // check if this TaskManager is valid
-                                       if 
(!registeredResources.contains(slot.getTaskManagerID())) {
-                                               return false;
-                                       }
-
-                                       final 
FlinkCompletableFuture<SlotDescriptor> pendingRequest = 
pollPendingRequest(slotDescriptor);
-                                       if (pendingRequest != null) {
-                                               
pendingRequest.complete(slotDescriptor);
-                                       }
-                                       else {
-                                               
availableSlots.add(slotDescriptor);
-                                       }
-
-                                       return true;
+                       if (allocatedSlots.remove(slot)) {
+                               // this slot allocation is still valid, use the 
slot to fulfill another request
+                               // or make it available again
+                               final AllocatedSlot taskManagerSlot = 
slot.getAllocatedSlot();
+                               final PendingRequest pendingRequest = 
pollMatchingPendingRequest(taskManagerSlot);
+       
+                               if (pendingRequest != null) {
+                                       LOG.debug("Fulfilling pending request 
[{}] early with returned slot [{}]",
+                                                       
pendingRequest.allocationID(), taskManagerSlot.getSlotAllocationId());
+
+                                       
pendingRequest.future().complete(createSimpleSlot(taskManagerSlot, 
Locality.UNKNOWN));
                                }
                                else {
-                                       throw new 
IllegalArgumentException("Slot was not allocated from this pool.");
+                                       LOG.debug("Adding returned slot [{}] to 
available slots", taskManagerSlot.getSlotAllocationId());
+                                       availableSlots.add(taskManagerSlot, 
clock.relativeTimeMillis());
                                }
                        }
-               }
-               else {
-                       return false;
+                       else {
+                               LOG.debug("Returned slot's allocation has been 
failed. Dropping slot.");
+                       }
                }
        }
 
-       private FlinkCompletableFuture<SlotDescriptor> pollPendingRequest(final 
SlotDescriptor slotDescriptor) {
-               for (Map.Entry<AllocationID, Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>>> entry : pendingRequests.entrySet()) {
-                       final Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>> pendingRequest = entry.getValue();
-                       if 
(slotDescriptor.getResourceProfile().isMatching(pendingRequest.f0.getResourceProfile()))
 {
-                               pendingRequests.remove(entry.getKey());
-                               return pendingRequest.f1;
+       private PendingRequest pollMatchingPendingRequest(final AllocatedSlot 
slot) {
+               final ResourceProfile slotResources = slot.getResourceProfile();
+
+               for (PendingRequest request : pendingRequests.values()) {
+                       if 
(slotResources.isMatching(request.resourceProfile())) {
+                               pendingRequests.remove(request.allocationID());
+                               return request;
                        }
                }
+
+               // no request pending, or no request matches
                return null;
        }
 
-       /**
-        * Release slot to TaskManager, called for finished tasks or canceled 
jobs.
-        *
-        * @param slot The slot needs to be released.
-        */
-       public void releaseSlot(final Slot slot) {
-               synchronized (lock) {
-                       allocatedSlots.remove(slot);
-                       availableSlots.remove(new SlotDescriptor(slot));
-                       // TODO: send release request to task manager
+       @RpcMethod
+       public Iterable<SlotOffer> offerSlots(Iterable<Tuple2<AllocatedSlot, 
SlotOffer>> offers) {
+               validateRunsInMainThread();
+
+               final ArrayList<SlotOffer> result = new ArrayList<>();
+               for (Tuple2<AllocatedSlot, SlotOffer> offer : offers) {
+                       if (offerSlot(offer.f0)) {
+                               result.add(offer.f1);
+                       }
                }
-       }
 
+               return result.isEmpty() ? Collections.<SlotOffer>emptyList() : 
result;
+       }
+       
        /**
         * Slot offering by TaskManager with AllocationID. The AllocationID is 
originally generated by this pool and
         * transfer through the ResourceManager to TaskManager. We use it to 
distinguish the different allocation
         * we issued. Slot offering may be rejected if we find something 
mismatching or there is actually no pending
         * request waiting for this slot (maybe fulfilled by some other 
returned slot).
         *
-        * @param allocationID   The allocation id of the lo
-        * @param slotDescriptor The offered slot descriptor
+        * @param slot The offered slot
         * @return True if we accept the offering
         */
-       public boolean offerSlot(final AllocationID allocationID, final 
SlotDescriptor slotDescriptor) {
-               synchronized (lock) {
-                       // check if this TaskManager is valid
-                       final ResourceID resourceID = 
slotDescriptor.getTaskManagerLocation().getResourceID();
-                       if (!registeredResources.contains(resourceID)) {
-                               LOG.warn("Allocation[{}] Slot offering from 
unregistered TaskManager: {}",
-                                       allocationID, slotDescriptor);
-                               return false;
-                       }
+       @RpcMethod
+       public boolean offerSlot(final AllocatedSlot slot) {
+               validateRunsInMainThread();
 
-                       // check whether we have already using this slot
-                       final Slot allocatedSlot = 
allocatedSlots.get(allocationID);
-                       if (allocatedSlot != null) {
-                               final SlotDescriptor allocatedSlotDescriptor = 
new SlotDescriptor(allocatedSlot);
+               // check if this TaskManager is valid
+               final ResourceID resourceID = slot.getTaskManagerId();
+               final AllocationID allocationID = slot.getSlotAllocationId();
 
-                               if 
(allocatedSlotDescriptor.equals(slotDescriptor)) {
-                                       LOG.debug("Allocation[{}] Duplicated 
slot offering: {}",
-                                               allocationID, slotDescriptor);
-                                       return true;
-                               }
-                               else {
-                                       LOG.info("Allocation[{}] Allocation had 
been fulfilled by slot {}, rejecting offered slot {}",
-                                               allocationID, 
allocatedSlotDescriptor, slotDescriptor);
-                                       return false;
-                               }
-                       }
+               if (!registeredTaskManagers.contains(resourceID)) {
+                       LOG.debug("Received outdated slot offering [{}] from 
unregistered TaskManager: {}",
+                                       slot.getSlotAllocationId(), slot);
+                       return false;
+               }
 
-                       // check whether we already have this slot in free pool
-                       if (availableSlots.contains(slotDescriptor)) {
-                               LOG.debug("Allocation[{}] Duplicated slot 
offering: {}",
-                                       allocationID, slotDescriptor);
-                               return true;
-                       }
+               // check whether we have already using this slot
+               if (allocatedSlots.contains(allocationID) || 
availableSlots.contains(allocationID)) {
+                       LOG.debug("Received repeated offer for slot [{}]. 
Ignoring.", allocationID);
 
-                       // check whether we have request waiting for this slot
-                       if (pendingRequests.containsKey(allocationID)) {
-                               FlinkCompletableFuture<SlotDescriptor> future = 
pendingRequests.remove(allocationID).f1;
-                               future.complete(slotDescriptor);
-                               return true;
-                       }
+                       // return true here so that the sender will get a 
positive acknowledgement to the retry
+                       // and mark the offering as a success
+                       return true;
+               }
 
-                       // unwanted slot, rejecting this offer
-                       return false;
+               // check whether we have request waiting for this slot
+               PendingRequest pendingRequest = 
pendingRequests.remove(allocationID);
+               if (pendingRequest != null) {
+                       // we were waiting for this!
+                       SimpleSlot resultSlot = createSimpleSlot(slot, 
Locality.UNKNOWN);
+                       pendingRequest.future().complete(resultSlot);
+                       allocatedSlots.add(resultSlot);
+               }
+               else {
+                       // we were actually not waiting for this:
+                       //   - could be that this request had been fulfilled
+                       //   - we are receiving the slots from TaskManagers 
after becoming leaders
+                       availableSlots.add(slot, clock.relativeTimeMillis());
                }
+
+               // we accepted the request in any case. slot will be released 
after it idled for
+               // too long and timed out
+               return true;
        }
 
+       
+       // TODO - periodic (every minute or so) catch slots that were lost 
(check all slots, if they have any task active)
+
+       // TODO - release slots that were not used to the resource manager
+
        // 
------------------------------------------------------------------------
        //  Error Handling
        // 
------------------------------------------------------------------------
@@ -405,24 +502,29 @@ public class SlotPool implements SlotOwner {
         * @param allocationID Represents the allocation which should be failed
         * @param cause        The cause of the failure
         */
+       @RpcMethod
        public void failAllocation(final AllocationID allocationID, final 
Exception cause) {
-               synchronized (lock) {
-                       // 1. check whether the allocation still pending
-                       Tuple2<SlotRequest, 
FlinkCompletableFuture<SlotDescriptor>> pendingRequest =
-                                       pendingRequests.get(allocationID);
-                       if (pendingRequest != null) {
-                               pendingRequest.f1.completeExceptionally(cause);
-                               return;
+               final PendingRequest pendingRequest = 
pendingRequests.remove(allocationID);
+               if (pendingRequest != null) {
+                       // request was still pending
+                       LOG.debug("Failed pending request [{}] with ", 
allocationID, cause);
+                       pendingRequest.future().completeExceptionally(cause);
+               }
+               else if (availableSlots.tryRemove(allocationID)) {
+                       LOG.debug("Failed available slot [{}] with ", 
allocationID, cause);
+               }
+               else {
+                       Slot slot = allocatedSlots.remove(allocationID);
+                       if (slot != null) {
+                               // release the slot.
+                               // since it is not in 'allocatedSlots' any 
more, it will be dropped o return'
+                               slot.releaseSlot();
+                       }
+                       else {
+                               LOG.debug("Outdated request to fail slot [{}] 
with ", allocationID, cause);
                        }
-
-                       // 2. check whether we have a free slot corresponding 
to this allocation id
-                       // TODO: add allocation id to slot descriptor, so we 
can remove it by allocation id
-
-                       // 3. check whether we have a in-use slot corresponding 
to this allocation id
-                       // TODO: needs mechanism to release the in-use Slot but 
don't return it back to this pool
-
-                       // TODO: add some unit tests when the previous two are 
ready, the allocation may failed at any phase
                }
+               // TODO: add some unit tests when the previous two are ready, 
the allocation may failed at any phase
        }
 
        // 
------------------------------------------------------------------------
@@ -435,10 +537,9 @@ public class SlotPool implements SlotOwner {
         *
         * @param resourceID The id of the TaskManager
         */
-       public void registerResource(final ResourceID resourceID) {
-               synchronized (lock) {
-                       registeredResources.add(resourceID);
-               }
+       @RpcMethod
+       public void registerTaskManager(final ResourceID resourceID) {
+               registeredTaskManagers.add(resourceID);
        }
 
        /**
@@ -447,12 +548,12 @@ public class SlotPool implements SlotOwner {
         *
         * @param resourceID The id of the TaskManager
         */
-       public void releaseResource(final ResourceID resourceID) {
-               synchronized (lock) {
-                       registeredResources.remove(resourceID);
-                       availableSlots.removeByResource(resourceID);
+       @RpcMethod
+       public void releaseTaskManager(final ResourceID resourceID) {
+               if (registeredTaskManagers.remove(resourceID)) {
+                       availableSlots.removeAllForTaskManager(resourceID);
 
-                       final Set<Slot> allocatedSlotsForResource = 
allocatedSlots.getSlotsByResource(resourceID);
+                       final Set<Slot> allocatedSlotsForResource = 
allocatedSlots.removeSlotsForTaskManager(resourceID);
                        for (Slot slot : allocatedSlotsForResource) {
                                slot.releaseSlot();
                        }
@@ -460,24 +561,15 @@ public class SlotPool implements SlotOwner {
        }
 
        // 
------------------------------------------------------------------------
-       //  ResourceManager
+       //  Utilities
        // 
------------------------------------------------------------------------
 
-       public void setResourceManager(
-               final UUID resourceManagerLeaderId,
-               final ResourceManagerGateway resourceManagerGateway)
-       {
-               synchronized (lock) {
-                       this.resourceManagerLeaderId = resourceManagerLeaderId;
-                       this.resourceManagerGateway = resourceManagerGateway;
-               }
-       }
-
-       public void disconnectResourceManager() {
-               synchronized (lock) {
-                       this.resourceManagerLeaderId = null;
-                       this.resourceManagerGateway = null;
+       private SimpleSlot createSimpleSlot(AllocatedSlot slot, Locality 
locality) {
+               SimpleSlot result = new SimpleSlot(slot, providerAndOwner, 
slot.getSlotNumber());
+               if (locality != null) {
+                       result.setLocality(locality);
                }
+               return result;
        }
 
        // 
------------------------------------------------------------------------
@@ -487,45 +579,34 @@ public class SlotPool implements SlotOwner {
        /**
         * Organize allocated slots from different points of view.
         */
-       static class AllocatedSlots {
+       private static class AllocatedSlots {
 
                /** All allocated slots organized by TaskManager's id */
-               private final Map<ResourceID, Set<Slot>> 
allocatedSlotsByResource;
-
-               /** All allocated slots organized by Slot object */
-               private final Map<Slot, AllocationID> allocatedSlots;
-
-               /** All allocated slot descriptors organized by Slot object */
-               private final Map<Slot, SlotDescriptor> 
allocatedSlotsWithDescriptor;
+               private final Map<ResourceID, Set<Slot>> 
allocatedSlotsByTaskManager;
 
                /** All allocated slots organized by AllocationID */
                private final Map<AllocationID, Slot> allocatedSlotsById;
 
                AllocatedSlots() {
-                       this.allocatedSlotsByResource = new HashMap<>();
-                       this.allocatedSlots = new HashMap<>();
-                       this.allocatedSlotsWithDescriptor = new HashMap<>();
+                       this.allocatedSlotsByTaskManager = new HashMap<>();
                        this.allocatedSlotsById = new HashMap<>();
                }
 
                /**
-                * Add a new allocation
+                * Adds a new slot to this collection.
                 *
-                * @param allocationID The allocation id
-                * @param slot         The allocated slot
+                * @param slot The allocated slot
                 */
-               void add(final AllocationID allocationID, final SlotDescriptor 
descriptor, final Slot slot) {
-                       allocatedSlots.put(slot, allocationID);
-                       allocatedSlotsById.put(allocationID, slot);
-                       allocatedSlotsWithDescriptor.put(slot, descriptor);
+               void add(Slot slot) {
+                       
allocatedSlotsById.put(slot.getAllocatedSlot().getSlotAllocationId(), slot);
 
                        final ResourceID resourceID = slot.getTaskManagerID();
-                       Set<Slot> slotsForResource = 
allocatedSlotsByResource.get(resourceID);
-                       if (slotsForResource == null) {
-                               slotsForResource = new HashSet<>();
-                               allocatedSlotsByResource.put(resourceID, 
slotsForResource);
+                       Set<Slot> slotsForTaskManager = 
allocatedSlotsByTaskManager.get(resourceID);
+                       if (slotsForTaskManager == null) {
+                               slotsForTaskManager = new HashSet<>();
+                               allocatedSlotsByTaskManager.put(resourceID, 
slotsForTaskManager);
                        }
-                       slotsForResource.add(slot);
+                       slotsForTaskManager.add(slot);
                }
 
                /**
@@ -541,11 +622,11 @@ public class SlotPool implements SlotOwner {
                /**
                 * Check whether we have allocated this slot
                 *
-                * @param slot The slot needs to checked
+                * @param slotAllocationId The allocation id of the slot to 
check
                 * @return True if we contains this slot
                 */
-               boolean contains(final Slot slot) {
-                       return allocatedSlots.containsKey(slot);
+               boolean contains(AllocationID slotAllocationId) {
+                       return allocatedSlotsById.containsKey(slotAllocationId);
                }
 
                /**
@@ -553,25 +634,27 @@ public class SlotPool implements SlotOwner {
                 *
                 * @param slot The slot needs to be removed
                 */
-               SlotDescriptor remove(final Slot slot) {
-                       final SlotDescriptor descriptor = 
allocatedSlotsWithDescriptor.remove(slot);
-                       if (descriptor != null) {
-                               final AllocationID allocationID = 
allocatedSlots.remove(slot);
-                               if (allocationID != null) {
-                                       allocatedSlotsById.remove(allocationID);
-                               } else {
-                                       throw new IllegalStateException("Bug: 
maps are inconsistent");
-                               }
+               boolean remove(final Slot slot) {
+                       return 
remove(slot.getAllocatedSlot().getSlotAllocationId()) != null;
+               }
 
-                               final ResourceID resourceID = 
slot.getTaskManagerID();
-                               final Set<Slot> slotsForResource = 
allocatedSlotsByResource.get(resourceID);
-                               slotsForResource.remove(slot);
-                               if (slotsForResource.isEmpty()) {
-                                       
allocatedSlotsByResource.remove(resourceID);
+               /**
+                * Remove an allocation with slot.
+                *
+                * @param slotId The ID of the slot to be removed
+                */
+               Slot remove(final AllocationID slotId) {
+                       Slot slot = allocatedSlotsById.remove(slotId);
+                       if (slot != null) {
+                               final ResourceID taskManagerId = 
slot.getTaskManagerID();
+                               Set<Slot> slotsForTM = 
allocatedSlotsByTaskManager.get(taskManagerId);
+                               slotsForTM.remove(slot);
+                               if (slotsForTM.isEmpty()) {
+                                       
allocatedSlotsByTaskManager.get(taskManagerId);
                                }
-                               
-                               return descriptor;
-                       } else {
+                               return slot;
+                       }
+                       else {
                                return null;
                        }
                }
@@ -582,119 +665,326 @@ public class SlotPool implements SlotOwner {
                 * @param resourceID The id of the TaskManager
                 * @return Set of slots which are allocated from the same 
TaskManager
                 */
-               Set<Slot> getSlotsByResource(final ResourceID resourceID) {
-                       Set<Slot> slotsForResource = 
allocatedSlotsByResource.get(resourceID);
-                       if (slotsForResource != null) {
-                               return new HashSet<>(slotsForResource);
+               Set<Slot> removeSlotsForTaskManager(final ResourceID 
resourceID) {
+                       Set<Slot> slotsForTaskManager = 
allocatedSlotsByTaskManager.remove(resourceID);
+                       if (slotsForTaskManager != null) {
+                               for (Slot slot : slotsForTaskManager) {
+                                       
allocatedSlotsById.remove(slot.getAllocatedSlot().getSlotAllocationId());
+                               }
+                               return slotsForTaskManager;
                        }
                        else {
-                               return new HashSet<>();
+                               return Collections.emptySet();
                        }
                }
 
+               void clear() {
+                       allocatedSlotsById.clear();
+                       allocatedSlotsByTaskManager.clear();
+               }
+
                @VisibleForTesting
                boolean containResource(final ResourceID resourceID) {
-                       return allocatedSlotsByResource.containsKey(resourceID);
+                       return 
allocatedSlotsByTaskManager.containsKey(resourceID);
                }
 
                @VisibleForTesting
                int size() {
-                       return allocatedSlots.size();
+                       return allocatedSlotsById.size();
                }
        }
 
+       // 
------------------------------------------------------------------------
+
        /**
         * Organize all available slots from different points of view.
         */
-       static class AvailableSlots {
+       private static class AvailableSlots {
 
                /** All available slots organized by TaskManager */
-               private final Map<ResourceID, Set<SlotDescriptor>> 
availableSlotsByResource;
+               private final HashMap<ResourceID, Set<AllocatedSlot>> 
availableSlotsByTaskManager;
+
+               /** All available slots organized by host */
+               private final HashMap<String, Set<AllocatedSlot>> 
availableSlotsByHost;
 
-               /** All available slots */
-               private final Set<SlotDescriptor> availableSlots;
+               /** The available slots, with the time when they were inserted 
*/
+               private final HashMap<AllocationID, SlotAndTimestamp> 
availableSlots;
 
                AvailableSlots() {
-                       this.availableSlotsByResource = new HashMap<>();
-                       this.availableSlots = new HashSet<>();
+                       this.availableSlotsByTaskManager = new HashMap<>();
+                       this.availableSlotsByHost = new HashMap<>();
+                       this.availableSlots = new HashMap<>();
                }
 
                /**
-                * Add an available slot.
+                * Adds an available slot.
                 *
-                * @param descriptor The descriptor of the slot
+                * @param slot The slot to add
                 */
-               void add(final SlotDescriptor descriptor) {
-                       availableSlots.add(descriptor);
-
-                       final ResourceID resourceID = 
descriptor.getTaskManagerLocation().getResourceID();
-                       Set<SlotDescriptor> slotsForResource = 
availableSlotsByResource.get(resourceID);
-                       if (slotsForResource == null) {
-                               slotsForResource = new HashSet<>();
-                               availableSlotsByResource.put(resourceID, 
slotsForResource);
+               void add(final AllocatedSlot slot, final long timestamp) {
+                       checkNotNull(slot);
+
+                       SlotAndTimestamp previous = availableSlots.put(
+                                       slot.getSlotAllocationId(), new 
SlotAndTimestamp(slot, timestamp));
+
+                       if (previous == null) {
+                               final ResourceID resourceID = 
slot.getTaskManagerLocation().getResourceID();
+                               final String host = 
slot.getTaskManagerLocation().getFQDNHostname();
+
+                               Set<AllocatedSlot> slotsForTaskManager = 
availableSlotsByTaskManager.get(resourceID);
+                               if (slotsForTaskManager == null) {
+                                       slotsForTaskManager = new HashSet<>();
+                                       
availableSlotsByTaskManager.put(resourceID, slotsForTaskManager);
+                               }
+                               slotsForTaskManager.add(slot);
+
+                               Set<AllocatedSlot> slotsForHost = 
availableSlotsByHost.get(host);
+                               if (slotsForHost == null) {
+                                       slotsForHost = new HashSet<>();
+                                       availableSlotsByHost.put(host, 
slotsForHost);
+                               }
+                               slotsForHost.add(slot);
+                       }
+                       else {
+                               throw new IllegalStateException("slot already 
contained");
                        }
-                       slotsForResource.add(descriptor);
                }
 
                /**
-                * Check whether we have this slot
-                *
-                * @param slotDescriptor The descriptor of the slot
-                * @return True if we contains this slot
+                * Check whether we have this slot.
                 */
-               boolean contains(final SlotDescriptor slotDescriptor) {
-                       return availableSlots.contains(slotDescriptor);
+               boolean contains(AllocationID slotId) {
+                       return availableSlots.containsKey(slotId);
                }
 
                /**
-                * Poll a slot which matches the required resource profile
+                * Poll a slot which matches the required resource profile. The 
polling tries to satisfy the
+                * location preferences, by TaskManager and by host.
                 *
-                * @param resourceProfile The required resource profile
+                * @param resourceProfile      The required resource profile.
+                * @param locationPreferences  The location preferences, in 
order to be checked.
+                * 
                 * @return Slot which matches the resource profile, null if we 
can't find a match
                 */
-               SlotDescriptor poll(final ResourceProfile resourceProfile) {
-                       for (SlotDescriptor slotDescriptor : availableSlots) {
-                               if 
(slotDescriptor.getResourceProfile().isMatching(resourceProfile)) {
-                                       remove(slotDescriptor);
-                                       return slotDescriptor;
+               SlotAndLocality poll(ResourceProfile resourceProfile, 
Iterable<TaskManagerLocation> locationPreferences) {
+                       // fast path if no slots are available
+                       if (availableSlots.isEmpty()) {
+                               return null;
+                       }
+
+                       boolean hadLocationPreference = false;
+
+                       if (locationPreferences != null) {
+
+                               // first search by TaskManager
+                               for (TaskManagerLocation location : 
locationPreferences) {
+                                       hadLocationPreference = true;
+
+                                       final Set<AllocatedSlot> onTaskManager 
= availableSlotsByTaskManager.get(location.getResourceID());
+                                       if (onTaskManager != null) {
+                                               for (AllocatedSlot candidate : 
onTaskManager) {
+                                                       if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+                                                               
remove(candidate.getSlotAllocationId());
+                                                               return new 
SlotAndLocality(candidate, Locality.LOCAL);
+                                                       }
+                                               }
+                                       }
+                               }
+
+                               // now, search by host
+                               for (TaskManagerLocation location : 
locationPreferences) {
+                                       final Set<AllocatedSlot> onHost = 
availableSlotsByHost.get(location.getFQDNHostname());
+                                       if (onHost != null) {
+                                               for (AllocatedSlot candidate : 
onHost) {
+                                                       if 
(candidate.getResourceProfile().isMatching(resourceProfile)) {
+                                                               
remove(candidate.getSlotAllocationId());
+                                                               return new 
SlotAndLocality(candidate, Locality.HOST_LOCAL);
+                                                       }
+                                               }
+                                       }
+                               }
+                       }
+
+                       // take any slot
+                       for (SlotAndTimestamp candidate : 
availableSlots.values()) {
+                               final AllocatedSlot slot = candidate.slot();
+
+                               if 
(slot.getResourceProfile().isMatching(resourceProfile)) {
+                                       remove(slot.getSlotAllocationId());
+                                       return new SlotAndLocality(
+                                                       slot, 
hadLocationPreference ? Locality.NON_LOCAL : Locality.UNCONSTRAINED);
                                }
                        }
+
+                       // nothing available that matches
                        return null;
                }
 
                /**
                 * Remove all available slots come from specified TaskManager.
                 *
-                * @param resourceID The id of the TaskManager
+                * @param taskManager The id of the TaskManager
                 */
-               void removeByResource(final ResourceID resourceID) {
-                       final Set<SlotDescriptor> slotsForResource = 
availableSlotsByResource.remove(resourceID);
-                       if (slotsForResource != null) {
-                               for (SlotDescriptor slotDescriptor : 
slotsForResource) {
-                                       availableSlots.remove(slotDescriptor);
+               void removeAllForTaskManager(final ResourceID taskManager) {
+                       // remove from the by-TaskManager view
+                       final Set<AllocatedSlot> slotsForTm = 
availableSlotsByTaskManager.remove(taskManager);
+
+                       if (slotsForTm != null && slotsForTm.size() > 0) {
+                               final String host = 
slotsForTm.iterator().next().getTaskManagerLocation().getFQDNHostname();
+                               final Set<AllocatedSlot> slotsForHost = 
availableSlotsByHost.get(host);
+
+                               // remove from the base set and the by-host view
+                               for (AllocatedSlot slot : slotsForTm) {
+                                       
availableSlots.remove(slot.getSlotAllocationId());
+                                       slotsForHost.remove(slot);
+                               }
+
+                               if (slotsForHost.isEmpty()) {
+                                       availableSlotsByHost.remove(host);
                                }
                        }
                }
 
-               private void remove(final SlotDescriptor slotDescriptor) {
-                       availableSlots.remove(slotDescriptor);
+               boolean tryRemove(AllocationID slotId) {
+                       final SlotAndTimestamp sat = 
availableSlots.remove(slotId);
+                       if (sat != null) {
+                               final AllocatedSlot slot = sat.slot();
+                               final ResourceID resourceID = 
slot.getTaskManagerLocation().getResourceID();
+                               final String host = 
slot.getTaskManagerLocation().getFQDNHostname();
+
+                               final Set<AllocatedSlot> slotsForTm = 
availableSlotsByTaskManager.get(resourceID);
+                               final Set<AllocatedSlot> slotsForHost = 
availableSlotsByHost.get(host);
+
+                               slotsForTm.remove(slot);
+                               slotsForHost.remove(slot);
+
+                               if (slotsForTm.isEmpty()) {
+                                       
availableSlotsByTaskManager.remove(resourceID);
+                               }
+                               if (slotsForHost.isEmpty()) {
+                                       availableSlotsByHost.remove(host);
+                               }
+
+                               return true;
+                       }
+                       else {
+                               return false;
+                       }
+               }
 
-                       final ResourceID resourceID = 
slotDescriptor.getTaskManagerLocation().getResourceID();
-                       final Set<SlotDescriptor> slotsForResource = 
checkNotNull(availableSlotsByResource.get(resourceID));
-                       slotsForResource.remove(slotDescriptor);
-                       if (slotsForResource.isEmpty()) {
-                               availableSlotsByResource.remove(resourceID);
+               private void remove(AllocationID slotId) throws 
IllegalStateException {
+                       if (!tryRemove(slotId)) {
+                               throw new IllegalStateException("slot not 
contained");
                        }
                }
 
                @VisibleForTesting
-               boolean containResource(final ResourceID resourceID) {
-                       return availableSlotsByResource.containsKey(resourceID);
+               boolean containsTaskManager(ResourceID resourceID) {
+                       return 
availableSlotsByTaskManager.containsKey(resourceID);
                }
 
                @VisibleForTesting
                int size() {
                        return availableSlots.size();
                }
+
+               @VisibleForTesting
+               void clear() {
+                       availableSlots.clear();
+                       availableSlotsByTaskManager.clear();
+                       availableSlotsByHost.clear();
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * An implementation of the {@link SlotOwner} and {@link SlotProvider} 
interfaces
+        * that delegates methods as RPC calls to the SlotPool's RPC gateway.
+        */
+       private static class ProviderAndOwner implements SlotOwner, 
SlotProvider {
+
+               private final SlotPoolGateway gateway;
+
+               private final Time timeout;
+
+               ProviderAndOwner(SlotPoolGateway gateway, Time timeout) {
+                       this.gateway = gateway;
+                       this.timeout = timeout;
+               }
+
+               @Override
+               public boolean returnAllocatedSlot(Slot slot) {
+                       gateway.returnAllocatedSlot(slot);
+                       return true;
+               }
+
+               @Override
+               public Future<SimpleSlot> allocateSlot(ScheduledUnit task, 
boolean allowQueued) {
+                       return gateway.allocateSlot(
+                                       task, ResourceProfile.UNKNOWN, 
Collections.<TaskManagerLocation>emptyList(), timeout);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A pending request for a slot
+        */
+       private static class PendingRequest {
+
+               private final AllocationID allocationID;
+
+               private final FlinkCompletableFuture<SimpleSlot> future;
+
+               private final ResourceProfile resourceProfile;
+
+               PendingRequest(
+                               AllocationID allocationID,
+                               FlinkCompletableFuture<SimpleSlot> future,
+                               ResourceProfile resourceProfile) {
+                       this.allocationID = allocationID;
+                       this.future = future;
+                       this.resourceProfile = resourceProfile;
+               }
+
+               public AllocationID allocationID() {
+                       return allocationID;
+               }
+
+               public FlinkCompletableFuture<SimpleSlot> future() {
+                       return future;
+               }
+
+               public ResourceProfile resourceProfile() {
+                       return resourceProfile;
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * A slot, together with the timestamp when it was added
+        */
+       private static class SlotAndTimestamp {
+
+               private final AllocatedSlot slot;
+
+               private final long timestamp;
+
+               SlotAndTimestamp(
+                               AllocatedSlot slot,
+                               long timestamp) {
+                       this.slot = slot;
+                       this.timestamp = timestamp;
+               }
+
+               public AllocatedSlot slot() {
+                       return slot;
+               }
+
+               public long timestamp() {
+                       return timestamp;
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
new file mode 100644
index 0000000..42942ca
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+
+import java.util.UUID;
+
+/**
+ * The gateway for calls on the {@link SlotPool}. 
+ */
+public interface SlotPoolGateway extends RpcGateway {
+
+       // 
------------------------------------------------------------------------
+       //  shutdown
+       // 
------------------------------------------------------------------------
+
+       void suspend();
+
+       // 
------------------------------------------------------------------------
+       //  resource manager connection
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Connects the SlotPool to the given ResourceManager. After this 
method is called, the
+        * SlotPool will be able to request resources from the given 
ResourceManager.
+        * 
+        * @param resourceManagerLeaderId The leader session ID of the resource 
manager.
+        * @param resourceManagerGateway  The RPC gateway for the resource 
manager.
+        */
+       void connectToResourceManager(UUID resourceManagerLeaderId, 
ResourceManagerGateway resourceManagerGateway);
+
+       /**
+        * Disconnects the slot pool from its current Resource Manager. After 
this call, the pool will not
+        * be able to request further slots from the Resource Manager, and all 
currently pending requests
+        * to the resource manager will be canceled.
+        * 
+        * <p>The slot pool will still be able to serve slots from its internal 
pool.
+        */
+       void disconnectResourceManager();
+
+       // 
------------------------------------------------------------------------
+       //  registering / un-registering TaskManagers and slots
+       // 
------------------------------------------------------------------------
+
+       void registerTaskManager(ResourceID resourceID);
+
+       void releaseTaskManager(ResourceID resourceID);
+
+       Future<Boolean> offerSlot(AllocatedSlot slot);
+
+       Future<Iterable<SlotOffer>> offerSlots(Iterable<Tuple2<AllocatedSlot, 
SlotOffer>> offers);
+       
+       void failAllocation(AllocationID allocationID, Exception cause);
+
+       // 
------------------------------------------------------------------------
+       //  allocating and disposing slots
+       // 
------------------------------------------------------------------------
+
+       Future<SimpleSlot> allocateSlot(
+                       ScheduledUnit task,
+                       ResourceProfile resources,
+                       Iterable<TaskManagerLocation> locationPreferences,
+                       @RpcTimeout Time timeout);
+
+       void returnAllocatedSlot(Slot slot);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
index ec6e9b1..0ef2482 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/Locality.java
@@ -19,19 +19,19 @@
 package org.apache.flink.runtime.jobmanager.scheduler;
 
 public enum Locality {
-       
-       /**
-        * No constraint existed on the task placement.
-        */
+
+       /** No constraint existed on the task placement. */
        UNCONSTRAINED,
-       
-       /**
-        * The task was scheduled respecting its locality preferences.
-        */
+
+       /** The task was scheduled into the same TaskManager as requested */
        LOCAL,
-       
-       /**
-        * The task was scheduled to a destination not included in its locality 
preferences.
-        */
-       NON_LOCAL
+
+       /** The task was scheduled onto the same host as requested */
+       HOST_LOCAL,
+
+       /** The task was scheduled to a destination not included in its 
locality preferences. */
+       NON_LOCAL,
+
+       /** No locality information was provided, it is unknown if the locality 
was respected */
+       UNKNOWN
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
index e45747b..546f31f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/NoResourceAvailableException.java
@@ -54,7 +54,11 @@ public class NoResourceAvailableException extends 
JobException {
        public NoResourceAvailableException(String message) {
                super(message);
        }
-       
+
+       public NoResourceAvailableException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
        // 
--------------------------------------------------------------------------------------------
        
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index 9419ab4..f477c49 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.jobmanager.slots;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -54,6 +56,9 @@ public class AllocatedSlot {
        /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
        private final TaskManagerGateway taskManagerGateway;
 
+       /** RPC gateway to call the TaskManager that holds this slot */
+       private final TaskExecutorGateway taskExecutorGateway;
+
        /** The number of the slot on the TaskManager to which slot belongs. 
Purely informational. */
        private final int slotNumber;
 
@@ -73,15 +78,23 @@ public class AllocatedSlot {
                this.slotNumber = slotNumber;
                this.resourceProfile = checkNotNull(resourceProfile);
                this.taskManagerGateway = checkNotNull(taskManagerGateway);
+               this.taskExecutorGateway = null;
        }
 
-       public AllocatedSlot(AllocatedSlot other) {
-               this.slotAllocationId = other.slotAllocationId;
-               this.jobID = other.jobID;
-               this.taskManagerLocation = other.taskManagerLocation;
-               this.slotNumber = other.slotNumber;
-               this.resourceProfile = other.resourceProfile;
-               this.taskManagerGateway = other.taskManagerGateway;
+       public AllocatedSlot(
+                       AllocationID slotAllocationId,
+                       JobID jobID,
+                       TaskManagerLocation location,
+                       int slotNumber,
+                       ResourceProfile resourceProfile,
+                       TaskExecutorGateway taskExecutorGateway) {
+               this.slotAllocationId = checkNotNull(slotAllocationId);
+               this.jobID = checkNotNull(jobID);
+               this.taskManagerLocation = checkNotNull(location);
+               this.slotNumber = slotNumber;
+               this.resourceProfile = checkNotNull(resourceProfile);
+               this.taskManagerGateway = null;
+               this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
        }
 
        // 
------------------------------------------------------------------------
@@ -96,6 +109,17 @@ public class AllocatedSlot {
        }
 
        /**
+        * Gets the ID of the TaskManager on which this slot was allocated.
+        * 
+        * <p>This is equivalent to {@link 
#getTaskManagerLocation()#getTaskManagerId()}.
+        * 
+        * @return This slot's TaskManager's ID.
+        */
+       public ResourceID getTaskManagerId() {
+               return getTaskManagerLocation().getResourceID();
+       }
+
+       /**
         * Returns the ID of the job this allocated slot belongs to.
         *
         * @return the ID of the job this allocated slot belongs to
@@ -142,8 +166,28 @@ public class AllocatedSlot {
                return taskManagerGateway;
        }
 
+       public TaskExecutorGateway getTaskExecutorGateway() {
+               return taskExecutorGateway;
+       }
+
        // 
------------------------------------------------------------------------
 
+       /**
+        * This always returns a reference hash code.
+        */
+       @Override
+       public final int hashCode() {
+               return super.hashCode();
+       }
+
+       /**
+        * This always checks based on reference equality.
+        */
+       @Override
+       public final boolean equals(Object obj) {
+               return this == obj;
+       }
+
        @Override
        public String toString() {
                return "AllocatedSlot " + slotAllocationId + " @ " + 
taskManagerLocation + " - " + slotNumber;

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
deleted file mode 100644
index 5655fc2..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/PooledSlotProvider.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.jobmanager.slots;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotPool;
-import org.apache.flink.runtime.instance.SlotProvider;
-import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
-
-/**
- * A simple pool based slot provider with {@link SlotPool} as the underlying 
storage.
- */
-public class PooledSlotProvider implements SlotProvider {
-
-       /** The pool which holds all the slots. */
-       private final SlotPool slotPool;
-
-       /** The timeout for allocation. */
-       private final Time timeout;
-
-       public PooledSlotProvider(final SlotPool slotPool, final Time timeout) {
-               this.slotPool = slotPool;
-               this.timeout = timeout;
-       }
-
-       @Override
-       public Future<SimpleSlot> allocateSlot(ScheduledUnit task,
-                       boolean allowQueued) throws NoResourceAvailableException
-       {
-               checkNotNull(task);
-
-               final JobID jobID = 
task.getTaskToExecute().getVertex().getJobId();
-               final Future<SimpleSlot> future = 
slotPool.allocateSimpleSlot(jobID, ResourceProfile.UNKNOWN);
-               try {
-                       final SimpleSlot slot = future.get(timeout.getSize(), 
timeout.getUnit());
-                       return FlinkCompletableFuture.completed(slot);
-               } catch (InterruptedException e) {
-                       throw new NoResourceAvailableException("Could not 
allocate a slot because it's interrupted.");
-               } catch (ExecutionException e) {
-                       throw new NoResourceAvailableException("Could not 
allocate a slot because some error occurred " +
-                                       "during allocation, " + e.getMessage());
-               } catch (TimeoutException e) {
-                       throw new NoResourceAvailableException("Could not 
allocate a slot within time limit: " + timeout);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
new file mode 100644
index 0000000..3fe5346
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/SlotAndLocality.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager.slots;
+
+import org.apache.flink.runtime.jobmanager.scheduler.Locality;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a {@link AllocatedSlot} and a {@link Locality}.
+ */
+public class SlotAndLocality {
+
+       private final AllocatedSlot slot;
+
+       private final Locality locality;
+
+       public SlotAndLocality(AllocatedSlot slot, Locality locality) {
+               this.slot = checkNotNull(slot);
+               this.locality = checkNotNull(locality);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       public AllocatedSlot slot() {
+               return slot;
+       }
+
+       public Locality locality() {
+               return locality;
+       }
+
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public String toString() {
+               return "Slot: " + slot + " (" + locality + ')';
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 0b3b68e..a620390 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -53,8 +53,8 @@ import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
 import org.apache.flink.runtime.instance.Slot;
-import org.apache.flink.runtime.instance.SlotDescriptor;
 import org.apache.flink.runtime.instance.SlotPool;
+import org.apache.flink.runtime.instance.SlotPoolGateway;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
-import org.apache.flink.runtime.jobmanager.slots.PooledSlotProvider;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmaster.message.ClassloadingProps;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
 import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
@@ -91,19 +91,18 @@ import 
org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.util.SerializedThrowable;
 import org.apache.flink.util.InstantiationUtil;
+
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -160,7 +159,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        private final SlotPool slotPool;
 
-       private final Time allocationTimeout;
+       private final SlotPoolGateway slotPoolGateway;
 
        private volatile UUID leaderSessionID;
 
@@ -249,8 +248,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                // register self as job status change listener
                executionGraph.registerJobStatusListener(new 
JobManagerJobStatusListener());
 
-               this.slotPool = new SlotPool(executorService);
-               this.allocationTimeout = Time.of(5, TimeUnit.SECONDS);
+               this.slotPool = new SlotPool(rpcService, jobGraph.getJobID());
+               this.slotPoolGateway = slotPool.getSelf();
 
                this.registeredTaskManagers = new HashMap<>(4);
        }
@@ -272,10 +271,6 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
         */
        public void start(final UUID leaderSessionID) throws Exception {
                if (LEADER_ID_UPDATER.compareAndSet(this, null, 
leaderSessionID)) {
-
-                       // make sure the slot pool now accepts messages for 
this leader  
-                       slotPool.setJobManagerLeaderId(leaderSessionID);
-
                        // make sure we receive RPC and async calls
                        super.start();
 
@@ -305,8 +300,18 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
        @RpcMethod
        public void startJobExecution() {
+               // double check that the leader status did not change
+               if (leaderSessionID == null) {
+                       log.info("Aborting job startup - JobManager lost leader 
status");
+                       return;
+               }
+
                log.info("Starting execution of job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
 
+               // start the slot pool make sure the slot pool now accepts 
messages for this leader
+               log.debug("Staring SlotPool component");
+               slotPool.start(leaderSessionID);
+
                try {
                        // job is ready to go, try to establish connection with 
resource manager
                        //   - activate leader retrieval for the resource 
manager
@@ -328,7 +333,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        @Override
                        public void run() {
                                try {
-                                       executionGraph.scheduleForExecution(new 
PooledSlotProvider(slotPool, allocationTimeout));
+                                       
executionGraph.scheduleForExecution(slotPool.getSlotProvider());
                                }
                                catch (Throwable t) {
                                        executionGraph.fail(t);
@@ -353,27 +358,26 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        return;
                }
 
-               // receive no more messages until started again, should be 
called before we clear self leader id
-               ((StartStoppable) getSelf()).stop();
-
+               // not leader any more - should not accept any leader messages 
any more
                leaderSessionID = null;
-               slotPool.setJobManagerLeaderId(null);
-               executionGraph.suspend(cause);
 
-               // disconnect from resource manager:
                try {
                        resourceManagerLeaderRetriever.stop();
-               } catch (Exception e) {
-                       log.warn("Failed to stop resource manager leader 
retriever when suspending.", e);
+               } catch (Throwable t) {
+                       log.warn("Failed to stop resource manager leader 
retriever when suspending.", t);
                }
-               closeResourceManagerConnection();
 
-               // TODO: in the future, the slot pool should not release the 
resources, so that
-               // TODO: the TaskManagers offer the resources to the new leader 
-               for (ResourceID taskManagerId : 
registeredTaskManagers.keySet()) {
-                       slotPool.releaseResource(taskManagerId);
-               }
-               registeredTaskManagers.clear();
+               // tell the execution graph (JobManager is still processing 
messages here) 
+               executionGraph.suspend(cause);
+
+               // receive no more messages until started again, should be 
called before we clear self leader id
+               ((StartStoppable) getSelf()).stop();
+
+               // the slot pool stops receiving messages and clears its pooled 
slots 
+               slotPoolGateway.suspend();
+
+               // disconnect from resource manager:
+               closeResourceManagerConnection();
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -452,6 +456,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                }
        }
 
+       @RpcMethod
        public ExecutionState requestPartitionState(
                        final UUID leaderSessionID,
                        final IntermediateDataSetID intermediateResultId,
@@ -624,9 +629,11 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
        }
 
        @RpcMethod
-       public Iterable<SlotOffer> offerSlots(final ResourceID taskManagerId,
-                       final Iterable<SlotOffer> slots, final UUID leaderId) 
throws Exception
-       {
+       public Future<Iterable<SlotOffer>> offerSlots(
+                       final ResourceID taskManagerId,
+                       final Iterable<SlotOffer> slots,
+                       final UUID leaderId) throws Exception {
+
                validateLeaderSessionId(leaderSessionID);
 
                Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = 
registeredTaskManagers.get(taskManagerId);
@@ -634,20 +641,22 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        throw new Exception("Unknown TaskManager " + 
taskManagerId);
                }
 
-               final Set<SlotOffer> acceptedSlotOffers = new HashSet<>(4);
+               final JobID jid = jobGraph.getJobID();
+               final TaskManagerLocation taskManagerLocation = taskManager.f0;
+               final TaskExecutorGateway taskManagerGateway = taskManager.f1;
+
+               final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> 
slotsAndOffers = new ArrayList<>();
+
                for (SlotOffer slotOffer : slots) {
-                       final SlotDescriptor slotDescriptor = new 
SlotDescriptor(
-                                       jobGraph.getJobID(),
-                                       taskManager.f0,
-                                       slotOffer.getSlotIndex(),
-                                       slotOffer.getResourceProfile(),
-                                       null); // TODO: replace the actor 
gateway with the new rpc gateway, it's ready (taskManager.f1)
-                       if (slotPool.offerSlot(slotOffer.getAllocationId(), 
slotDescriptor)) {
-                               acceptedSlotOffers.add(slotOffer);
-                       }
+                       final AllocatedSlot slot = new AllocatedSlot(
+                                       slotOffer.getAllocationId(), jid, 
taskManagerLocation,
+                                       slotOffer.getSlotIndex(), 
slotOffer.getResourceProfile(),
+                                       taskManagerGateway);
+
+                       slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
                }
 
-               return acceptedSlotOffers;
+               return slotPoolGateway.offerSlots(slotsAndOffers);
        }
 
        @RpcMethod
@@ -662,7 +671,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        throw new Exception("Unknown TaskManager " + 
taskManagerId);
                }
 
-               slotPool.failAllocation(allocationId, cause);
+               slotPoolGateway.failAllocation(allocationId, cause);
        }
 
        @RpcMethod
@@ -708,7 +717,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                                                return new 
RegistrationResponse.Decline("Invalid leader session id");
                                        }
 
-                                       
slotPool.registerResource(taskManagerId);
+                                       
slotPoolGateway.registerTaskManager(taskManagerId);
                                        
registeredTaskManagers.put(taskManagerId, Tuple2.of(taskManagerLocation, 
taskExecutorGateway));
                                        return new 
JMTMRegistrationSuccess(taskManagerId, libraryCacheManager.getBlobServerPort());
                                }
@@ -840,7 +849,7 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        log.info("JobManager successfully registered at 
ResourceManager, leader id: {}.",
                                        success.getResourceManagerLeaderId());
 
-                       slotPool.setResourceManager(
+                       slotPoolGateway.connectToResourceManager(
                                        success.getResourceManagerLeaderId(), 
resourceManagerConnection.getTargetGateway());
                }
        }
@@ -852,7 +861,8 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        resourceManagerConnection.close();
                        resourceManagerConnection = null;
                }
-               slotPool.disconnectResourceManager();
+
+               slotPoolGateway.disconnectResourceManager();
        }
 
        private void validateLeaderSessionId(UUID leaderSessionID) throws 
LeaderIdMismatchException {

http://git-wip-us.apache.org/repos/asf/flink/blob/a1ba9f11/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index b971b96..f30e345 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -25,6 +25,7 @@ import org.apache.flink.util.ReflectionUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nonnull;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
@@ -207,6 +208,17 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
         * @param runnable Runnable to be executed
         * @param delay    The delay after which the runnable will be executed
         */
+       protected void scheduleRunAsync(Runnable runnable, Time delay) {
+               scheduleRunAsync(runnable, delay.getSize(), delay.getUnit());
+       }
+
+       /**
+        * Execute the runnable in the main thread of the underlying RPC 
endpoint, with
+        * a delay of the given number of milliseconds.
+        *
+        * @param runnable Runnable to be executed
+        * @param delay    The delay after which the runnable will be executed
+        */
        protected void scheduleRunAsync(Runnable runnable, long delay, TimeUnit 
unit) {
                ((MainThreadExecutable) self).scheduleRunAsync(runnable, 
unit.toMillis(delay));
        }
@@ -255,7 +267,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        /**
         * Executor which executes runnables in the main thread context.
         */
-       private class MainThreadExecutor implements Executor {
+       private static class MainThreadExecutor implements Executor {
 
                private final MainThreadExecutable gateway;
 
@@ -264,7 +276,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                }
 
                @Override
-               public void execute(Runnable runnable) {
+               public void execute(@Nonnull Runnable runnable) {
                        gateway.runAsync(runnable);
                }
        }
@@ -277,7 +289,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        private Class<C> determineSelfGatewayType() {
 
                // determine self gateway type
-               Class c = getClass();
+               Class<?> c = getClass();
                Class<C> determinedSelfGatewayType;
                do {
                        determinedSelfGatewayType = 
ReflectionUtil.getTemplateType1(c);

Reply via email to