[FLINK-4987] Add RpcTaskManagerGateway implementation; Port AllocatedSlotsTest, 
AvailableSlotsTest and SlotPoolTest

The RpcTaskManagerGateway is the TaskManagerGateway of Flink's new RPC 
abstraction. It basically forwards all calls to the underlying 
TaskExecutorGateway.

Moreover, this PR enables the disabled tests AllocatedSlotsTest, 
AvailableSlotsTest and SlotPoolTest.

Add license header to RpcTaskManagerGateway

Fix ExecutionGraphMetricsTest


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/522edae3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/522edae3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/522edae3

Branch: refs/heads/master
Commit: 522edae3c767ed178691a4b4f93e676fdf2898fb
Parents: a1ba9f1
Author: Till Rohrmann <[email protected]>
Authored: Tue Nov 1 03:03:23 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Fri Dec 23 20:54:25 2016 +0100

----------------------------------------------------------------------
 .../flink/runtime/executiongraph/Execution.java |   1 +
 .../runtime/executiongraph/ExecutionVertex.java |   3 +-
 .../apache/flink/runtime/instance/SlotPool.java |  19 +-
 .../runtime/jobmanager/slots/AllocatedSlot.java |  31 +-
 .../flink/runtime/jobmaster/JobMaster.java      |  28 +-
 .../jobmaster/RpcTaskManagerGateway.java        | 142 +++++
 .../resourcemanager/ResourceManager.java        |   5 +
 .../taskexecutor/TaskExecutorGateway.java       |  12 +-
 .../ExecutionGraphMetricsTest.java              |   8 +-
 .../ExecutionVertexDeploymentTest.java          |   7 +
 .../runtime/instance/AllocatedSlotsTest.java    | 275 +++++----
 .../runtime/instance/AvailableSlotsTest.java    | 244 ++++----
 .../flink/runtime/instance/SlotPoolTest.java    | 608 ++++++++++---------
 .../runtime/minicluster/MiniClusterITCase.java  |   4 +-
 .../runtime/rpc/TestingSerialRpcService.java    |   5 +-
 15 files changed, 783 insertions(+), 609 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 16aebce..18a4445 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -298,6 +298,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        });
 
                        // if tasks have to scheduled immediately check that 
the task has been deployed
+                       // TODO: This might be problematic if the future is not 
completed right away
                        if (!queued) {
                                if (!deploymentFuture.isDone()) {
                                        markFailed(new 
IllegalArgumentException("The slot allocation future has not been completed 
yet."));

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index a1c4d8b..3b5a6cc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.Archiveable;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.JobException;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -627,7 +626,7 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
                        serializedJobInformation,
                        serializedJobVertexInformation,
                        executionId,
-                       new AllocationID(), // TODO: Obtain the proper 
allocation id from the slot
+                       targetSlot.getAllocatedSlot().getSlotAllocationId(),
                        subTaskIndex,
                        attemptNumber,
                        targetSlot.getRoot().getSlotNumber(),

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 5a3a321..05884d1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -172,7 +172,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
         *
         * @param jobManagerLeaderId The necessary leader id for running the 
job.
         */
-       public void start(UUID jobManagerLeaderId) {
+       public void start(UUID jobManagerLeaderId) throws Exception {
                this.jobManagerLeaderId = jobManagerLeaderId;
 
                // TODO - start should not throw an exception
@@ -295,7 +295,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                pendingRequests.put(allocationID, new 
PendingRequest(allocationID, future, resources));
 
                Future<RMSlotRequestReply> rmResponse = 
resourceManagerGateway.requestSlot(
-                               resourceManagerLeaderId, jobManagerLeaderId,
+                               jobManagerLeaderId, resourceManagerLeaderId,
                                new SlotRequest(jobId, allocationID, resources),
                                resourceManagerRequestsTimeout);
 
@@ -579,7 +579,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        /**
         * Organize allocated slots from different points of view.
         */
-       private static class AllocatedSlots {
+       static class AllocatedSlots {
 
                /** All allocated slots organized by TaskManager's id */
                private final Map<ResourceID, Set<Slot>> 
allocatedSlotsByTaskManager;
@@ -650,7 +650,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                                Set<Slot> slotsForTM = 
allocatedSlotsByTaskManager.get(taskManagerId);
                                slotsForTM.remove(slot);
                                if (slotsForTM.isEmpty()) {
-                                       
allocatedSlotsByTaskManager.get(taskManagerId);
+                                       
allocatedSlotsByTaskManager.remove(taskManagerId);
                                }
                                return slot;
                        }
@@ -692,6 +692,15 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> 
{
                int size() {
                        return allocatedSlotsById.size();
                }
+
+               @VisibleForTesting
+               Set<Slot> getSlotsForTaskManager(ResourceID resourceId) {
+                       if 
(allocatedSlotsByTaskManager.containsKey(resourceId)) {
+                               return 
allocatedSlotsByTaskManager.get(resourceId);
+                       } else {
+                               return Collections.emptySet();
+                       }
+               }
        }
 
        // 
------------------------------------------------------------------------
@@ -699,7 +708,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        /**
         * Organize all available slots from different points of view.
         */
-       private static class AvailableSlots {
+       static class AvailableSlots {
 
                /** All available slots organized by TaskManager */
                private final HashMap<ResourceID, Set<AllocatedSlot>> 
availableSlotsByTaskManager;

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
index f477c49..a0c608d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/slots/AllocatedSlot.java
@@ -53,11 +53,8 @@ public class AllocatedSlot {
        /** The resource profile of the slot provides */
        private final ResourceProfile resourceProfile;
 
-       /** TEMP until the new RPC is in place: The actor gateway to 
communicate with the TaskManager */
-       private final TaskManagerGateway taskManagerGateway;
-
        /** RPC gateway to call the TaskManager that holds this slot */
-       private final TaskExecutorGateway taskExecutorGateway;
+       private final TaskManagerGateway taskManagerGateway;
 
        /** The number of the slot on the TaskManager to which slot belongs. 
Purely informational. */
        private final int slotNumber;
@@ -69,32 +66,14 @@ public class AllocatedSlot {
                        JobID jobID,
                        TaskManagerLocation location,
                        int slotNumber,
-                       ResourceProfile resourceProfile,
-                       TaskManagerGateway taskManagerGateway)
-       {
+                       ResourceProfile resourceProfile,                
+                       TaskManagerGateway taskManagerGateway) {
                this.slotAllocationId = checkNotNull(slotAllocationId);
                this.jobID = checkNotNull(jobID);
                this.taskManagerLocation = checkNotNull(location);
                this.slotNumber = slotNumber;
                this.resourceProfile = checkNotNull(resourceProfile);
                this.taskManagerGateway = checkNotNull(taskManagerGateway);
-               this.taskExecutorGateway = null;
-       }
-
-       public AllocatedSlot(
-                       AllocationID slotAllocationId,
-                       JobID jobID,
-                       TaskManagerLocation location,
-                       int slotNumber,
-                       ResourceProfile resourceProfile,
-                       TaskExecutorGateway taskExecutorGateway) {
-               this.slotAllocationId = checkNotNull(slotAllocationId);
-               this.jobID = checkNotNull(jobID);
-               this.taskManagerLocation = checkNotNull(location);
-               this.slotNumber = slotNumber;
-               this.resourceProfile = checkNotNull(resourceProfile);
-               this.taskManagerGateway = null;
-               this.taskExecutorGateway = checkNotNull(taskExecutorGateway);
        }
 
        // 
------------------------------------------------------------------------
@@ -166,10 +145,6 @@ public class AllocatedSlot {
                return taskManagerGateway;
        }
 
-       public TaskExecutorGateway getTaskExecutorGateway() {
-               return taskExecutorGateway;
-       }
-
        // 
------------------------------------------------------------------------
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index a620390..d6bcf2c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -308,9 +308,15 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
 
                log.info("Starting execution of job {} ({})", 
jobGraph.getName(), jobGraph.getJobID());
 
-               // start the slot pool make sure the slot pool now accepts 
messages for this leader
-               log.debug("Staring SlotPool component");
-               slotPool.start(leaderSessionID);
+               try {
+                       // start the slot pool make sure the slot pool now 
accepts messages for this leader
+                       log.debug("Staring SlotPool component");
+                       slotPool.start(leaderSessionID);
+               } catch (Exception e) {
+                       log.error("Faild to start job {} ({})", 
jobGraph.getName(), jobGraph.getJobID(), e);
+
+                       handleFatalError(new Exception("Could not start job 
execution: Failed to start the slot pool.", e));
+               }
 
                try {
                        // job is ready to go, try to establish connection with 
resource manager
@@ -634,24 +640,30 @@ public class JobMaster extends 
RpcEndpoint<JobMasterGateway> {
                        final Iterable<SlotOffer> slots,
                        final UUID leaderId) throws Exception {
 
-               validateLeaderSessionId(leaderSessionID);
+               validateLeaderSessionId(leaderId);
 
                Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManager = 
registeredTaskManagers.get(taskManagerId);
+
                if (taskManager == null) {
                        throw new Exception("Unknown TaskManager " + 
taskManagerId);
                }
 
                final JobID jid = jobGraph.getJobID();
                final TaskManagerLocation taskManagerLocation = taskManager.f0;
-               final TaskExecutorGateway taskManagerGateway = taskManager.f1;
+               final TaskExecutorGateway taskExecutorGateway = taskManager.f1;
 
                final ArrayList<Tuple2<AllocatedSlot, SlotOffer>> 
slotsAndOffers = new ArrayList<>();
 
+               final RpcTaskManagerGateway rpcTaskManagerGateway = new 
RpcTaskManagerGateway(taskExecutorGateway, leaderId);
+
                for (SlotOffer slotOffer : slots) {
                        final AllocatedSlot slot = new AllocatedSlot(
-                                       slotOffer.getAllocationId(), jid, 
taskManagerLocation,
-                                       slotOffer.getSlotIndex(), 
slotOffer.getResourceProfile(),
-                                       taskManagerGateway);
+                               slotOffer.getAllocationId(),
+                               jid,
+                               taskManagerLocation,
+                               slotOffer.getSlotIndex(),
+                               slotOffer.getResourceProfile(),
+                               rpcTaskManagerGateway);
 
                        slotsAndOffers.add(new Tuple2<>(slot, slotOffer));
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
new file mode 100644
index 0000000..eba97d2
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/RpcTaskManagerGateway.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.PartitionInfo;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.messages.StackTrace;
+import org.apache.flink.runtime.messages.StackTraceSampleResponse;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.Preconditions;
+
+import java.util.UUID;
+
+/**
+ * Implementation of the {@link TaskManagerGateway} for Flink's RPC system
+ */
+public class RpcTaskManagerGateway implements TaskManagerGateway {
+
+       private final TaskExecutorGateway taskExecutorGateway;
+
+       private final UUID leaderId;
+
+       public RpcTaskManagerGateway(TaskExecutorGateway taskExecutorGateway, 
UUID leaderId) {
+               this.taskExecutorGateway = 
Preconditions.checkNotNull(taskExecutorGateway);
+               this.leaderId = Preconditions.checkNotNull(leaderId);
+       }
+
+       @Override
+       public String getAddress() {
+               return taskExecutorGateway.getAddress();
+       }
+
+       @Override
+       public void disconnectFromJobManager(InstanceID instanceId, Exception 
cause) {
+//             taskExecutorGateway.disconnectFromJobManager(instanceId, cause);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public void stopCluster(ApplicationStatus applicationStatus, String 
message) {
+//             taskExecutorGateway.stopCluster(applicationStatus, message);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public Future<StackTrace> requestStackTrace(Time timeout) {
+//             return taskExecutorGateway.requestStackTrace(timeout);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public Future<StackTraceSampleResponse> requestStackTraceSample(
+                       ExecutionAttemptID executionAttemptID,
+                       int sampleId,
+                       int numSamples,
+                       Time delayBetweenSamples,
+                       int maxStackTraceDepth,
+                       Time timeout) {
+//             return taskExecutorGateway.requestStackTraceSample(
+//                     executionAttemptID,
+//                     sampleId,
+//                     numSamples,
+//                     delayBetweenSamples,
+//                     maxStackTraceDepth,
+//                     timeout);
+
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public Future<Acknowledge> submitTask(TaskDeploymentDescriptor tdd, 
Time timeout) {
+               return taskExecutorGateway.submitTask(tdd, leaderId, timeout);
+       }
+
+       @Override
+       public Future<Acknowledge> stopTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+               return taskExecutorGateway.stopTask(executionAttemptID, 
timeout);
+       }
+
+       @Override
+       public Future<Acknowledge> cancelTask(ExecutionAttemptID 
executionAttemptID, Time timeout) {
+               return taskExecutorGateway.cancelTask(executionAttemptID, 
timeout);
+       }
+
+       @Override
+       public Future<Acknowledge> updatePartitions(ExecutionAttemptID 
executionAttemptID, Iterable<PartitionInfo> partitionInfos, Time timeout) {
+               return taskExecutorGateway.updatePartitions(executionAttemptID, 
partitionInfos, timeout);
+       }
+
+       @Override
+       public void failPartition(ExecutionAttemptID executionAttemptID) {
+               taskExecutorGateway.failPartition(executionAttemptID);
+       }
+
+       @Override
+       public void notifyCheckpointComplete(ExecutionAttemptID 
executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
+//             
taskExecutorGateway.notifyCheckpointComplete(executionAttemptID, jobId, 
checkpointId, timestamp);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public void triggerCheckpoint(ExecutionAttemptID executionAttemptID, 
JobID jobId, long checkpointId, long timestamp) {
+//             taskExecutorGateway.triggerCheckpoint(executionAttemptID, 
jobId, checkpointId, timestamp);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public Future<BlobKey> requestTaskManagerLog(Time timeout) {
+//             return taskExecutorGateway.requestTaskManagerLog(timeout);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+
+       @Override
+       public Future<BlobKey> requestTaskManagerStdout(Time timeout) {
+//             return taskExecutorGateway.requestTaskManagerStdout(timeout);
+               throw new UnsupportedOperationException("Operation is not yet 
supported.");
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index a81c214..145cc40 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -394,6 +394,11 @@ public abstract class ResourceManager<WorkerType extends 
Serializable>
                        UUID resourceManagerLeaderID,
                        SlotRequest slotRequest) {
 
+               log.info("Request slot with profile {} for job {} with 
allocation id {}.",
+                       slotRequest.getResourceProfile(),
+                       slotRequest.getJobId(),
+                       slotRequest.getAllocationId());
+
                JobID jobId = slotRequest.getJobId();
                JobManagerRegistration jobManagerRegistration = 
jobManagerRegistrations.get(jobId);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
index 1ffc407..ebd4c0c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java
@@ -75,9 +75,13 @@ public interface TaskExecutorGateway extends RpcGateway {
         *
         * @param executionAttemptID identifying the task
         * @param partitionInfos telling where the partition can be retrieved 
from
+        * @param timeout for the update partitions operation
         * @return Future acknowledge if the partitions have been successfully 
updated
         */
-       Future<Acknowledge> updatePartitions(ExecutionAttemptID 
executionAttemptID, Iterable<PartitionInfo> partitionInfos);
+       Future<Acknowledge> updatePartitions(
+               ExecutionAttemptID executionAttemptID,
+               Iterable<PartitionInfo> partitionInfos,
+               @RpcTimeout Time timeout);
 
        /**
         * Fail all intermediate result partitions of the given task.
@@ -112,15 +116,17 @@ public interface TaskExecutorGateway extends RpcGateway {
         * Stop the given task.
         *
         * @param executionAttemptID identifying the task
+        * @param timeout for the stop operation
         * @return Future acknowledge if the task is successfully stopped
         */
-       Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID);
+       Future<Acknowledge> stopTask(ExecutionAttemptID executionAttemptID, 
@RpcTimeout Time timeout);
 
        /**
         * Cancel the given task.
         *
         * @param executionAttemptID identifying the task
+        * @param timeout for the cancel operation
         * @return Future acknowledge if the task is successfully canceled
         */
-       Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID);
+       Future<Acknowledge> cancelTask(ExecutionAttemptID executionAttemptID, 
@RpcTimeout Time timeout);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
index 0f7e75f..41e47d2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphMetricsTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -36,6 +37,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.SuppressRestartsException;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.instance.Instance;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -127,6 +129,9 @@ public class ExecutionGraphMetricsTest extends TestLogger {
 
                        Slot rootSlot = mock(Slot.class);
 
+                       AllocatedSlot mockAllocatedSlot = 
mock(AllocatedSlot.class);
+                       
when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(new AllocationID());
+
                        SimpleSlot simpleSlot = mock(SimpleSlot.class);
                        when(simpleSlot.isAlive()).thenReturn(true);
                        
when(simpleSlot.getTaskManagerLocation()).thenReturn(taskManagerLocation);
@@ -134,6 +139,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
                        
when(simpleSlot.getTaskManagerGateway()).thenReturn(taskManagerGateway);
                        
when(simpleSlot.setExecutedVertex(Matchers.any(Execution.class))).thenReturn(true);
                        when(simpleSlot.getRoot()).thenReturn(rootSlot);
+                       
when(simpleSlot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
 
                        FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
                        future.complete(simpleSlot);
@@ -175,7 +181,7 @@ public class ExecutionGraphMetricsTest extends TestLogger {
        
                        // start execution
                        executionGraph.scheduleForExecution(scheduler);
-       
+
                        assertTrue(0L == restartingTime.getValue());
        
                        List<ExecutionAttemptID> executionIDs = new 
ArrayList<>();

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
index 8bc39a7..f3db5d8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexDeploymentTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
@@ -31,6 +32,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.ScheduleMode;
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.Test;
 
@@ -357,10 +359,15 @@ public class ExecutionVertexDeploymentTest {
                IntermediateResult result = new IntermediateResult(new 
IntermediateDataSetID(), jobVertex, 4, ResultPartitionType.PIPELINED);
                ExecutionVertex vertex = new ExecutionVertex(jobVertex, 0, new 
IntermediateResult[]{result}, Time.minutes(1));
 
+               AllocatedSlot allocatedSlot = mock(AllocatedSlot.class);
+               when(allocatedSlot.getSlotAllocationId()).thenReturn(new 
AllocationID());
+
                Slot root = mock(Slot.class);
                when(root.getSlotNumber()).thenReturn(1);
                SimpleSlot slot = mock(SimpleSlot.class);
                when(slot.getRoot()).thenReturn(root);
+               when(slot.getAllocatedSlot()).thenReturn(allocatedSlot);
+               when(root.getAllocatedSlot()).thenReturn(allocatedSlot);
 
                for (ScheduleMode mode : ScheduleMode.values()) {
                        vertex.getExecutionGraph().setScheduleMode(mode);

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
index 33ed679..e654a99 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AllocatedSlotsTest.java
@@ -1,135 +1,140 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.runtime.instance;
-//
-//import org.apache.flink.runtime.clusterframework.types.AllocationID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceID;
-//import org.junit.Test;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertNull;
-//import static org.junit.Assert.assertTrue;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//public class AllocatedSlotsTest {
-//
-//     @Test
-//     public void testOperations() throws Exception {
-//             SlotPool.AllocatedSlots allocatedSlots = new 
SlotPool.AllocatedSlots();
-//
-//             final AllocationID allocation1 = new AllocationID();
-//             final ResourceID resource1 = new ResourceID("resource1");
-//             final Slot slot1 = createSlot(resource1);
-//
-//             allocatedSlots.add(allocation1, new SlotDescriptor(slot1), 
slot1);
-//
-//             assertTrue(allocatedSlots.contains(slot1));
-//             assertTrue(allocatedSlots.containResource(resource1));
-//
-//             assertEquals(slot1, allocatedSlots.get(allocation1));
-//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
-//             assertEquals(1, allocatedSlots.size());
-//
-//             final AllocationID allocation2 = new AllocationID();
-//             final Slot slot2 = createSlot(resource1);
-//
-//             allocatedSlots.add(allocation2, new SlotDescriptor(slot2), 
slot2);
-//
-//             assertTrue(allocatedSlots.contains(slot1));
-//             assertTrue(allocatedSlots.contains(slot2));
-//             assertTrue(allocatedSlots.containResource(resource1));
-//
-//             assertEquals(slot1, allocatedSlots.get(allocation1));
-//             assertEquals(slot2, allocatedSlots.get(allocation2));
-//             assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
-//             assertEquals(2, allocatedSlots.size());
-//
-//             final AllocationID allocation3 = new AllocationID();
-//             final ResourceID resource2 = new ResourceID("resource2");
-//             final Slot slot3 = createSlot(resource2);
-//
-//             allocatedSlots.add(allocation3, new SlotDescriptor(slot2), 
slot3);
-//
-//             assertTrue(allocatedSlots.contains(slot1));
-//             assertTrue(allocatedSlots.contains(slot2));
-//             assertTrue(allocatedSlots.contains(slot3));
-//             assertTrue(allocatedSlots.containResource(resource1));
-//             assertTrue(allocatedSlots.containResource(resource2));
-//
-//             assertEquals(slot1, allocatedSlots.get(allocation1));
-//             assertEquals(slot2, allocatedSlots.get(allocation2));
-//             assertEquals(slot3, allocatedSlots.get(allocation3));
-//             assertEquals(2, 
allocatedSlots.getSlotsByResource(resource1).size());
-//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
-//             assertEquals(3, allocatedSlots.size());
-//
-//             allocatedSlots.remove(slot2);
-//
-//             assertTrue(allocatedSlots.contains(slot1));
-//             assertFalse(allocatedSlots.contains(slot2));
-//             assertTrue(allocatedSlots.contains(slot3));
-//             assertTrue(allocatedSlots.containResource(resource1));
-//             assertTrue(allocatedSlots.containResource(resource2));
-//
-//             assertEquals(slot1, allocatedSlots.get(allocation1));
-//             assertNull(allocatedSlots.get(allocation2));
-//             assertEquals(slot3, allocatedSlots.get(allocation3));
-//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource1).size());
-//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
-//             assertEquals(2, allocatedSlots.size());
-//
-//             allocatedSlots.remove(slot1);
-//
-//             assertFalse(allocatedSlots.contains(slot1));
-//             assertFalse(allocatedSlots.contains(slot2));
-//             assertTrue(allocatedSlots.contains(slot3));
-//             assertFalse(allocatedSlots.containResource(resource1));
-//             assertTrue(allocatedSlots.containResource(resource2));
-//
-//             assertNull(allocatedSlots.get(allocation1));
-//             assertNull(allocatedSlots.get(allocation2));
-//             assertEquals(slot3, allocatedSlots.get(allocation3));
-//             assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
-//             assertEquals(1, 
allocatedSlots.getSlotsByResource(resource2).size());
-//             assertEquals(1, allocatedSlots.size());
-//
-//             allocatedSlots.remove(slot3);
-//
-//             assertFalse(allocatedSlots.contains(slot1));
-//             assertFalse(allocatedSlots.contains(slot2));
-//             assertFalse(allocatedSlots.contains(slot3));
-//             assertFalse(allocatedSlots.containResource(resource1));
-//             assertFalse(allocatedSlots.containResource(resource2));
-//
-//             assertNull(allocatedSlots.get(allocation1));
-//             assertNull(allocatedSlots.get(allocation2));
-//             assertNull(allocatedSlots.get(allocation3));
-//             assertEquals(0, 
allocatedSlots.getSlotsByResource(resource1).size());
-//             assertEquals(0, 
allocatedSlots.getSlotsByResource(resource2).size());
-//             assertEquals(0, allocatedSlots.size());
-//     }
-//
-//     private Slot createSlot(final ResourceID resourceId) {
-//             Slot slot = mock(Slot.class);
-//             when(slot.getTaskManagerID()).thenReturn(resourceId);
-//             return slot;
-//     }
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AllocatedSlotsTest {
+
+       @Test
+       public void testOperations() throws Exception {
+               SlotPool.AllocatedSlots allocatedSlots = new 
SlotPool.AllocatedSlots();
+
+               final AllocationID allocation1 = new AllocationID();
+               final ResourceID resource1 = new ResourceID("resource1");
+               final Slot slot1 = createSlot(resource1, allocation1);
+
+               allocatedSlots.add(slot1);
+
+               
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+               assertTrue(allocatedSlots.containResource(resource1));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertEquals(1, 
allocatedSlots.getSlotsForTaskManager(resource1).size());
+               assertEquals(1, allocatedSlots.size());
+
+               final AllocationID allocation2 = new AllocationID();
+               final Slot slot2 = createSlot(resource1, allocation2);
+
+               allocatedSlots.add(slot2);
+
+               
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+               
assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+               assertTrue(allocatedSlots.containResource(resource1));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertEquals(slot2, allocatedSlots.get(allocation2));
+               assertEquals(2, 
allocatedSlots.getSlotsForTaskManager(resource1).size());
+               assertEquals(2, allocatedSlots.size());
+
+               final AllocationID allocation3 = new AllocationID();
+               final ResourceID resource2 = new ResourceID("resource2");
+               final Slot slot3 = createSlot(resource2, allocation3);
+
+               allocatedSlots.add(slot3);
+
+               
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+               
assertTrue(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+               
assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+               assertTrue(allocatedSlots.containResource(resource1));
+               assertTrue(allocatedSlots.containResource(resource2));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertEquals(slot2, allocatedSlots.get(allocation2));
+               assertEquals(slot3, allocatedSlots.get(allocation3));
+               assertEquals(2, 
allocatedSlots.getSlotsForTaskManager(resource1).size());
+               assertEquals(1, 
allocatedSlots.getSlotsForTaskManager(resource2).size());
+               assertEquals(3, allocatedSlots.size());
+
+               allocatedSlots.remove(slot2);
+
+               
assertTrue(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+               
assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+               
assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+               assertTrue(allocatedSlots.containResource(resource1));
+               assertTrue(allocatedSlots.containResource(resource2));
+
+               assertEquals(slot1, allocatedSlots.get(allocation1));
+               assertNull(allocatedSlots.get(allocation2));
+               assertEquals(slot3, allocatedSlots.get(allocation3));
+               assertEquals(1, 
allocatedSlots.getSlotsForTaskManager(resource1).size());
+               assertEquals(1, 
allocatedSlots.getSlotsForTaskManager(resource2).size());
+               assertEquals(2, allocatedSlots.size());
+
+               allocatedSlots.remove(slot1);
+
+               
assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+               
assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+               
assertTrue(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+               assertFalse(allocatedSlots.containResource(resource1));
+               assertTrue(allocatedSlots.containResource(resource2));
+
+               assertNull(allocatedSlots.get(allocation1));
+               assertNull(allocatedSlots.get(allocation2));
+               assertEquals(slot3, allocatedSlots.get(allocation3));
+               assertEquals(0, 
allocatedSlots.getSlotsForTaskManager(resource1).size());
+               assertEquals(1, 
allocatedSlots.getSlotsForTaskManager(resource2).size());
+               assertEquals(1, allocatedSlots.size());
+
+               allocatedSlots.remove(slot3);
+
+               
assertFalse(allocatedSlots.contains(slot1.getAllocatedSlot().getSlotAllocationId()));
+               
assertFalse(allocatedSlots.contains(slot2.getAllocatedSlot().getSlotAllocationId()));
+               
assertFalse(allocatedSlots.contains(slot3.getAllocatedSlot().getSlotAllocationId()));
+               assertFalse(allocatedSlots.containResource(resource1));
+               assertFalse(allocatedSlots.containResource(resource2));
+
+               assertNull(allocatedSlots.get(allocation1));
+               assertNull(allocatedSlots.get(allocation2));
+               assertNull(allocatedSlots.get(allocation3));
+               assertEquals(0, 
allocatedSlots.getSlotsForTaskManager(resource1).size());
+               assertEquals(0, 
allocatedSlots.getSlotsForTaskManager(resource2).size());
+               assertEquals(0, allocatedSlots.size());
+       }
+
+       private Slot createSlot(final ResourceID resourceId, final AllocationID 
allocationId) {
+               AllocatedSlot mockAllocatedSlot = mock(AllocatedSlot.class);
+               Slot slot = mock(Slot.class);
+               when(slot.getTaskManagerID()).thenReturn(resourceId);
+               when(slot.getAllocatedSlot()).thenReturn(mockAllocatedSlot);
+
+               
when(mockAllocatedSlot.getSlotAllocationId()).thenReturn(allocationId);
+               return slot;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
index 4d58a31..4ed88c4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/AvailableSlotsTest.java
@@ -1,123 +1,121 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.runtime.instance;
-//
-//import org.apache.flink.api.common.JobID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-//import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-//import org.junit.Test;
-//
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertNull;
-//import static org.junit.Assert.assertTrue;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.when;
-//
-//public class AvailableSlotsTest {
-//
-//     static final ResourceProfile DEFAULT_TESTING_PROFILE = new 
ResourceProfile(1.0, 512);
-//
-//     static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new 
ResourceProfile(2.0, 1024);
-//
-//     @Test
-//     public void testAddAndRemove() throws Exception {
-//             SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
-//
-//             final ResourceID resource1 = new ResourceID("resource1");
-//             final ResourceID resource2 = new ResourceID("resource2");
-//
-//             final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-//             final SlotDescriptor slot2 = createSlotDescriptor(resource1);
-//             final SlotDescriptor slot3 = createSlotDescriptor(resource2);
-//
-//             availableSlots.add(slot1);
-//             availableSlots.add(slot2);
-//             availableSlots.add(slot3);
-//
-//             assertEquals(3, availableSlots.size());
-//             assertTrue(availableSlots.contains(slot1));
-//             assertTrue(availableSlots.contains(slot2));
-//             assertTrue(availableSlots.contains(slot3));
-//             assertTrue(availableSlots.containResource(resource1));
-//             assertTrue(availableSlots.containResource(resource2));
-//
-//             availableSlots.removeByResource(resource1);
-//
-//             assertEquals(1, availableSlots.size());
-//             assertFalse(availableSlots.contains(slot1));
-//             assertFalse(availableSlots.contains(slot2));
-//             assertTrue(availableSlots.contains(slot3));
-//             assertFalse(availableSlots.containResource(resource1));
-//             assertTrue(availableSlots.containResource(resource2));
-//
-//             availableSlots.removeByResource(resource2);
-//
-//             assertEquals(0, availableSlots.size());
-//             assertFalse(availableSlots.contains(slot1));
-//             assertFalse(availableSlots.contains(slot2));
-//             assertFalse(availableSlots.contains(slot3));
-//             assertFalse(availableSlots.containResource(resource1));
-//             assertFalse(availableSlots.containResource(resource2));
-//     }
-//
-//     @Test
-//     public void testPollFreeSlot() {
-//             SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
-//
-//             final ResourceID resource1 = new ResourceID("resource1");
-//             final SlotDescriptor slot1 = createSlotDescriptor(resource1);
-//
-//             availableSlots.add(slot1);
-//
-//             assertEquals(1, availableSlots.size());
-//             assertTrue(availableSlots.contains(slot1));
-//             assertTrue(availableSlots.containResource(resource1));
-//
-//             assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE));
-//
-//             assertEquals(slot1, 
availableSlots.poll(DEFAULT_TESTING_PROFILE));
-//             assertEquals(0, availableSlots.size());
-//             assertFalse(availableSlots.contains(slot1));
-//             assertFalse(availableSlots.containResource(resource1));
-//     }
-//
-//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID) 
{
-//             return createSlotDescriptor(resourceID, new JobID());
-//     }
-//
-//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID) {
-//             return createSlotDescriptor(resourceID, jobID, 
DEFAULT_TESTING_PROFILE);
-//     }
-//
-//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
-//             final ResourceProfile resourceProfile)
-//     {
-//             return createSlotDescriptor(resourceID, jobID, resourceProfile, 
0);
-//     }
-//
-//     static SlotDescriptor createSlotDescriptor(final ResourceID resourceID, 
final JobID jobID,
-//             final ResourceProfile resourceProfile, final int slotNumber)
-//     {
-//             TaskManagerLocation location = mock(TaskManagerLocation.class);
-//             when(location.getResourceID()).thenReturn(resourceID);
-//             return new SlotDescriptor(jobID, location, slotNumber, 
resourceProfile, mock(ActorGateway.class));
-//     }
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.SlotAndLocality;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class AvailableSlotsTest {
+
+       static final ResourceProfile DEFAULT_TESTING_PROFILE = new 
ResourceProfile(1.0, 512);
+
+       static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE = new 
ResourceProfile(2.0, 1024);
+
+       @Test
+       public void testAddAndRemove() throws Exception {
+               SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
+
+               final ResourceID resource1 = new ResourceID("resource1");
+               final ResourceID resource2 = new ResourceID("resource2");
+
+               final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+               final AllocatedSlot slot2 = createAllocatedSlot(resource1);
+               final AllocatedSlot slot3 = createAllocatedSlot(resource2);
+
+               availableSlots.add(slot1, 1L);
+               availableSlots.add(slot2, 2L);
+               availableSlots.add(slot3, 3L);
+
+               assertEquals(3, availableSlots.size());
+               
assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+               
assertTrue(availableSlots.contains(slot2.getSlotAllocationId()));
+               
assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+               assertTrue(availableSlots.containsTaskManager(resource1));
+               assertTrue(availableSlots.containsTaskManager(resource2));
+
+               availableSlots.removeAllForTaskManager(resource1);
+
+               assertEquals(1, availableSlots.size());
+               
assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+               
assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
+               
assertTrue(availableSlots.contains(slot3.getSlotAllocationId()));
+               assertFalse(availableSlots.containsTaskManager(resource1));
+               assertTrue(availableSlots.containsTaskManager(resource2));
+
+               availableSlots.removeAllForTaskManager(resource2);
+
+               assertEquals(0, availableSlots.size());
+               
assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+               
assertFalse(availableSlots.contains(slot2.getSlotAllocationId()));
+               
assertFalse(availableSlots.contains(slot3.getSlotAllocationId()));
+               assertFalse(availableSlots.containsTaskManager(resource1));
+               assertFalse(availableSlots.containsTaskManager(resource2));
+       }
+
+       @Test
+       public void testPollFreeSlot() {
+               SlotPool.AvailableSlots availableSlots = new 
SlotPool.AvailableSlots();
+
+               final ResourceID resource1 = new ResourceID("resource1");
+               final AllocatedSlot slot1 = createAllocatedSlot(resource1);
+
+               availableSlots.add(slot1, 1L);
+
+               assertEquals(1, availableSlots.size());
+               
assertTrue(availableSlots.contains(slot1.getSlotAllocationId()));
+               assertTrue(availableSlots.containsTaskManager(resource1));
+
+               assertNull(availableSlots.poll(DEFAULT_TESTING_BIG_PROFILE, 
null));
+
+               SlotAndLocality slotAndLocality = 
availableSlots.poll(DEFAULT_TESTING_PROFILE, null);
+               assertEquals(slot1, slotAndLocality.slot());
+               assertEquals(0, availableSlots.size());
+               
assertFalse(availableSlots.contains(slot1.getSlotAllocationId()));
+               assertFalse(availableSlots.containsTaskManager(resource1));
+       }
+
+       static AllocatedSlot createAllocatedSlot(final ResourceID resourceId) {
+               TaskManagerLocation mockTaskManagerLocation = 
mock(TaskManagerLocation.class);
+               
when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
+
+               TaskManagerGateway mockTaskManagerGateway = 
mock(TaskManagerGateway.class);
+
+               return new AllocatedSlot(
+                       new AllocationID(),
+                       new JobID(),
+                       mockTaskManagerLocation,
+                       0,
+                       DEFAULT_TESTING_PROFILE,
+                       mockTaskManagerGateway);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index cc1d194..5fa7af3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -1,299 +1,309 @@
-///*
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.runtime.instance;
-//
-//import org.apache.flink.api.common.JobID;
-//import org.apache.flink.api.common.time.Time;
-//import org.apache.flink.runtime.clusterframework.types.AllocationID;
-//import org.apache.flink.runtime.clusterframework.types.ResourceID;
-//import org.apache.flink.runtime.concurrent.BiFunction;
-//import org.apache.flink.runtime.concurrent.Future;
-//import org.apache.flink.runtime.jobgraph.JobVertexID;
-//import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-//import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
-//import org.apache.flink.runtime.resourcemanager.SlotRequest;
-//import org.apache.flink.util.TestLogger;
-//import org.junit.After;
-//import org.junit.Before;
-//import org.junit.Test;
-//
-//import java.util.UUID;
-//import java.util.concurrent.ExecutionException;
-//import java.util.concurrent.Executor;
-//import java.util.concurrent.ExecutorService;
-//import java.util.concurrent.Executors;
-//import java.util.concurrent.TimeUnit;
-//
-//import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
-//import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.createSlotDescriptor;
-//import static org.junit.Assert.assertEquals;
-//import static org.junit.Assert.assertFalse;
-//import static org.junit.Assert.assertNotEquals;
-//import static org.junit.Assert.assertNotNull;
-//import static org.junit.Assert.assertNull;
-//import static org.junit.Assert.assertTrue;
-//import static org.junit.Assert.fail;
-//import static org.mockito.Matchers.any;
-//import static org.mockito.Mockito.mock;
-//import static org.mockito.Mockito.times;
-//import static org.mockito.Mockito.verify;
-//import static org.mockito.Mockito.when;
-//
-//public class SlotPoolTest extends TestLogger {
-//
-//     private ExecutorService executor;
-//
-//     private SlotPool slotPool;
-//
-//     private ResourceManagerGateway resourceManagerGateway;
-//
-//     @Before
-//     public void setUp() throws Exception {
-//             this.executor = Executors.newFixedThreadPool(1);
-//             this.slotPool = new SlotPool(executor);
-//             this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
-//             when(resourceManagerGateway
-//                     .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
-//                     .thenReturn(mock(Future.class));
-//
-//             slotPool.setResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
-//             slotPool.setJobManagerLeaderId(UUID.randomUUID());
-//     }
-//
-//     @After
-//     public void tearDown() throws Exception {
-//     }
-//
-//     @Test
-//     public void testAllocateSimpleSlot() throws Exception {
-//             ResourceID resourceID = new ResourceID("resource");
-//             slotPool.registerResource(resourceID);
-//
-//             JobID jobID = new JobID();
-//             AllocationID allocationID = new AllocationID();
-//             Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
-//             assertFalse(future.isDone());
-//             verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//
-//             SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-//             assertTrue(future.isDone());
-//             assertTrue(slot.isAlive());
-//             assertEquals(resourceID, slot.getTaskManagerID());
-//             assertEquals(jobID, slot.getJobID());
-//             assertEquals(slotPool, slot.getOwner());
-//     }
-//
-//     @Test
-//     public void testAllocateSharedSlot() throws Exception {
-//             ResourceID resourceID = new ResourceID("resource");
-//             slotPool.registerResource(resourceID);
-//
-//             JobVertexID vid = new JobVertexID();
-//             SlotSharingGroup sharingGroup = new SlotSharingGroup(vid);
-//             SlotSharingGroupAssignment assignment = 
sharingGroup.getTaskAssignment();
-//
-//             JobID jobID = new JobID();
-//             AllocationID allocationID = new AllocationID();
-//             Future<SharedSlot> future = slotPool.allocateSharedSlot(jobID, 
DEFAULT_TESTING_PROFILE, assignment, allocationID);
-//
-//             assertFalse(future.isDone());
-//             verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//
-//             SharedSlot slot = future.get(1, TimeUnit.SECONDS);
-//             assertTrue(future.isDone());
-//             assertTrue(slot.isAlive());
-//             assertEquals(resourceID, slot.getTaskManagerID());
-//             assertEquals(jobID, slot.getJobID());
-//             assertEquals(slotPool, slot.getOwner());
-//
-//             SimpleSlot simpleSlot = slot.allocateSubSlot(vid);
-//             assertNotNull(simpleSlot);
-//             assertTrue(simpleSlot.isAlive());
-//     }
-//
-//     @Test
-//     public void testAllocateSlotWithoutResourceManager() throws Exception {
-//             slotPool.disconnectResourceManager();
-//             Future<SimpleSlot> future = slotPool.allocateSimpleSlot(new 
JobID(), DEFAULT_TESTING_PROFILE);
-//             future.handleAsync(
-//                     new BiFunction<SimpleSlot, Throwable, Void>() {
-//                             @Override
-//                             public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
-//                                     assertNull(simpleSlot);
-//                                     assertNotNull(throwable);
-//                                     return null;
-//                             }
-//                     },
-//                     executor);
-//             try {
-//                     future.get(1, TimeUnit.SECONDS);
-//                     fail("We expected a ExecutionException.");
-//             } catch (ExecutionException ex) {
-//                     // we expect the exception
-//             }
-//     }
-//
-//     @Test
-//     public void testAllocationFulfilledByReturnedSlot() throws Exception {
-//             ResourceID resourceID = new ResourceID("resource");
-//             slotPool.registerResource(resourceID);
-//
-//             JobID jobID = new JobID();
-//
-//             AllocationID allocationID1 = new AllocationID();
-//             Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
-//
-//             AllocationID allocationID2 = new AllocationID();
-//             Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
-//
-//             assertFalse(future1.isDone());
-//             assertFalse(future2.isDone());
-//             verify(resourceManagerGateway, times(2))
-//                     .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class));
-//
-//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//             assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-//
-//             SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-//             assertTrue(future1.isDone());
-//             assertFalse(future2.isDone());
-//
-//             // return this slot to pool
-//             slot1.releaseSlot();
-//
-//             // second allocation fulfilled by previous slot returning
-//             SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-//             assertTrue(future2.isDone());
-//
-//             assertNotEquals(slot1, slot2);
-//             assertTrue(slot1.isReleased());
-//             assertTrue(slot2.isAlive());
-//             assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
-//             assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-//     }
-//
-//     @Test
-//     public void testAllocateWithFreeSlot() throws Exception {
-//             ResourceID resourceID = new ResourceID("resource");
-//             slotPool.registerResource(resourceID);
-//
-//             JobID jobID = new JobID();
-//             AllocationID allocationID1 = new AllocationID();
-//             Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
-//             assertFalse(future1.isDone());
-//
-//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//             assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-//
-//             SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-//             assertTrue(future1.isDone());
-//
-//             // return this slot to pool
-//             slot1.releaseSlot();
-//
-//             AllocationID allocationID2 = new AllocationID();
-//             Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
-//
-//             // second allocation fulfilled by previous slot returning
-//             SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
-//             assertTrue(future2.isDone());
-//
-//             assertNotEquals(slot1, slot2);
-//             assertTrue(slot1.isReleased());
-//             assertTrue(slot2.isAlive());
-//             assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
-//             assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
-//     }
-//
-//     @Test
-//     public void testOfferSlot() throws Exception {
-//             ResourceID resourceID = new ResourceID("resource");
-//             slotPool.registerResource(resourceID);
-//
-//             JobID jobID = new JobID();
-//             AllocationID allocationID = new AllocationID();
-//             Future<SimpleSlot> future = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID);
-//             assertFalse(future.isDone());
-//             verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), any(SlotRequest.class), any(Time.class));
-//
-//             // slot from unregistered resource
-//             SlotDescriptor invalid = createSlotDescriptor(new 
ResourceID("unregistered"), jobID, DEFAULT_TESTING_PROFILE);
-//             assertFalse(slotPool.offerSlot(allocationID, invalid));
-//
-//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//
-//             // reject offering with mismatch allocation id
-//             assertFalse(slotPool.offerSlot(new AllocationID(), 
slotDescriptor));
-//
-//             // accepted slot
-//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//             SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
-//             assertTrue(future.isDone());
-//             assertTrue(slot.isAlive());
-//
-//             // conflict offer with using slot
-//             SlotDescriptor conflict = createSlotDescriptor(resourceID, 
jobID, DEFAULT_TESTING_PROFILE);
-//             assertFalse(slotPool.offerSlot(allocationID, conflict));
-//
-//             // duplicated offer with using slot
-//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//             assertTrue(future.isDone());
-//             assertTrue(slot.isAlive());
-//
-//             // duplicated offer with free slot
-//             slot.releaseSlot();
-//             assertTrue(slot.isReleased());
-//             assertTrue(slotPool.offerSlot(allocationID, slotDescriptor));
-//     }
-//
-//     @Test
-//     public void testReleaseResource() throws Exception {
-//             ResourceID resourceID = new ResourceID("resource");
-//             slotPool.registerResource(resourceID);
-//
-//             JobID jobID = new JobID();
-//
-//             AllocationID allocationID1 = new AllocationID();
-//             Future<SimpleSlot> future1 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID1);
-//
-//             AllocationID allocationID2 = new AllocationID();
-//             Future<SimpleSlot> future2 = slotPool.allocateSimpleSlot(jobID, 
DEFAULT_TESTING_PROFILE, allocationID2);
-//
-//             SlotDescriptor slotDescriptor = 
createSlotDescriptor(resourceID, jobID, DEFAULT_TESTING_PROFILE);
-//             assertTrue(slotPool.offerSlot(allocationID1, slotDescriptor));
-//
-//             SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
-//             assertTrue(future1.isDone());
-//             assertFalse(future2.isDone());
-//
-//             slotPool.releaseResource(resourceID);
-//             assertTrue(slot1.isReleased());
-//
-//             // slot released and not usable, second allocation still not 
fulfilled
-//             Thread.sleep(10);
-//             assertFalse(future2.isDone());
-//     }
-//
-//}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.TestLogger;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SlotPoolTest extends TestLogger {
+
+       private RpcService rpcService;
+
+       private JobID jobId;
+
+       private MainThreadValidatorUtil mainThreadValidatorUtil;
+
+       private SlotPool slotPool;
+
+       private ResourceManagerGateway resourceManagerGateway;
+
+       @Before
+       public void setUp() throws Exception {
+
+               this.rpcService = new TestingSerialRpcService();
+               this.jobId = new JobID();
+               this.slotPool = new SlotPool(rpcService, jobId);
+
+               this.mainThreadValidatorUtil = new 
MainThreadValidatorUtil(slotPool);
+
+               mainThreadValidatorUtil.enterMainThread();
+
+               slotPool.start(UUID.randomUUID());
+
+               this.resourceManagerGateway = 
mock(ResourceManagerGateway.class);
+               when(resourceManagerGateway
+                       .requestSlot(any(UUID.class), any(UUID.class), 
any(SlotRequest.class), any(Time.class)))
+                       .thenReturn(mock(Future.class));
+
+               slotPool.connectToResourceManager(UUID.randomUUID(), 
resourceManagerGateway);
+       }
+
+       @After
+       public void tearDown() throws Exception {
+               mainThreadValidatorUtil.exitMainThread();
+       }
+
+       @Test
+       public void testAllocateSimpleSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerTaskManager(resourceID);
+
+               ScheduledUnit task = mock(ScheduledUnit.class);
+               Future<SimpleSlot> future = slotPool.allocateSlot(task, 
DEFAULT_TESTING_PROFILE, null);
+               assertFalse(future.isDone());
+
+               ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+               final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
+
+               AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+
+               SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+               assertEquals(resourceID, slot.getTaskManagerID());
+               assertEquals(jobId, slot.getJobID());
+               assertEquals(slotPool.getSlotOwner(), slot.getOwner());
+       }
+
+       @Test
+       public void testAllocateSlotWithoutResourceManager() throws Exception {
+               slotPool.disconnectResourceManager();
+               Future<SimpleSlot> future = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               future.handleAsync(
+                       new BiFunction<SimpleSlot, Throwable, Void>() {
+                               @Override
+                               public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
+                                       assertNull(simpleSlot);
+                                       assertNotNull(throwable);
+                                       return null;
+                               }
+                       },
+                       rpcService.getExecutor());
+               try {
+                       future.get(1, TimeUnit.SECONDS);
+                       fail("We expected a ExecutionException.");
+               } catch (ExecutionException ex) {
+                       // we expect the exception
+               }
+       }
+
+       @Test
+       public void testAllocationFulfilledByReturnedSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerTaskManager(resourceID);
+
+               Future<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               Future<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+               assertFalse(future1.isDone());
+               assertFalse(future2.isDone());
+
+               ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
+               verify(resourceManagerGateway, times(2))
+                       .requestSlot(any(UUID.class), any(UUID.class), 
slotRequestArgumentCaptor.capture(), any(Time.class));
+
+               final List<SlotRequest> slotRequests = 
slotRequestArgumentCaptor.getAllValues();
+
+               AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequests.get(0).getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+
+               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+               assertTrue(future1.isDone());
+               assertFalse(future2.isDone());
+
+               // return this slot to pool
+               slot1.releaseSlot();
+
+               // second allocation fulfilled by previous slot returning
+               SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+               assertTrue(future2.isDone());
+
+               assertNotEquals(slot1, slot2);
+               assertTrue(slot1.isReleased());
+               assertTrue(slot2.isAlive());
+               assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
+               assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+       }
+
+       @Test
+       public void testAllocateWithFreeSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerTaskManager(resourceID);
+
+               Future<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               assertFalse(future1.isDone());
+
+               ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+               final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
+
+               AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+
+               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+               assertTrue(future1.isDone());
+
+               // return this slot to pool
+               slot1.releaseSlot();
+
+               Future<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+               // second allocation fulfilled by previous slot returning
+               SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS);
+               assertTrue(future2.isDone());
+
+               assertNotEquals(slot1, slot2);
+               assertTrue(slot1.isReleased());
+               assertTrue(slot2.isAlive());
+               assertEquals(slot1.getTaskManagerID(), 
slot2.getTaskManagerID());
+               assertEquals(slot1.getSlotNumber(), slot2.getSlotNumber());
+       }
+
+       @Test
+       public void testOfferSlot() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerTaskManager(resourceID);
+
+               Future<SimpleSlot> future = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+               assertFalse(future.isDone());
+
+               ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+               final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
+
+               // slot from unregistered resource
+               AllocatedSlot invalid = createAllocatedSlot(new 
ResourceID("unregistered"), slotRequest.getAllocationId(), jobId, 
DEFAULT_TESTING_PROFILE);
+               assertFalse(slotPool.offerSlot(invalid));
+
+               AllocatedSlot notRequested = createAllocatedSlot(resourceID, 
new AllocationID(), jobId, DEFAULT_TESTING_PROFILE);
+
+               // we'll also accept non requested slots
+               assertTrue(slotPool.offerSlot(notRequested));
+
+               AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+
+               // accepted slot
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+               SimpleSlot slot = future.get(1, TimeUnit.SECONDS);
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+
+               // duplicated offer with using slot
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+               assertTrue(future.isDone());
+               assertTrue(slot.isAlive());
+
+               // duplicated offer with free slot
+               slot.releaseSlot();
+               assertTrue(slot.isReleased());
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+       }
+
+       @Test
+       public void testReleaseResource() throws Exception {
+               ResourceID resourceID = new ResourceID("resource");
+               slotPool.registerTaskManager(resourceID);
+
+               Future<SimpleSlot> future1 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+               ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = 
ArgumentCaptor.forClass(SlotRequest.class);
+               verify(resourceManagerGateway).requestSlot(any(UUID.class), 
any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class));
+
+               final SlotRequest slotRequest = 
slotRequestArgumentCaptor.getValue();
+
+               Future<SimpleSlot> future2 = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
+
+               AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, 
slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE);
+               assertTrue(slotPool.offerSlot(allocatedSlot));
+
+               SimpleSlot slot1 = future1.get(1, TimeUnit.SECONDS);
+               assertTrue(future1.isDone());
+               assertFalse(future2.isDone());
+
+               slotPool.releaseTaskManager(resourceID);
+               assertTrue(slot1.isReleased());
+
+               // slot released and not usable, second allocation still not 
fulfilled
+               Thread.sleep(10);
+               assertFalse(future2.isDone());
+       }
+
+       static AllocatedSlot createAllocatedSlot(
+                       final ResourceID resourceId,
+                       final AllocationID allocationId,
+                       final JobID jobId,
+                       final ResourceProfile resourceProfile) {
+               TaskManagerLocation mockTaskManagerLocation = 
mock(TaskManagerLocation.class);
+               
when(mockTaskManagerLocation.getResourceID()).thenReturn(resourceId);
+
+               TaskManagerGateway mockTaskManagerGateway = 
mock(TaskManagerGateway.class);
+
+               return new AllocatedSlot(
+                       allocationId,
+                       jobId,
+                       mockTaskManagerLocation,
+                       0,
+                       resourceProfile,
+                       mockTaskManagerGateway);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index f5b3892..2cf2d4d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -45,7 +45,7 @@ public class MiniClusterITCase extends TestLogger {
                executeJob(miniCluster);
        }
 
-//     @Test
+       @Test
        public void runJobWithMultipleRpcServices() throws Exception {
                MiniClusterConfiguration cfg = new MiniClusterConfiguration();
                cfg.setUseRpcServicePerComponent();
@@ -54,7 +54,7 @@ public class MiniClusterITCase extends TestLogger {
                executeJob(miniCluster);
        }
 
-//     @Test
+       @Test
        public void runJobWithMultipleJobManagers() throws Exception {
                MiniClusterConfiguration cfg = new MiniClusterConfiguration();
                cfg.setNumJobManagers(3);

http://git-wip-us.apache.org/repos/asf/flink/blob/522edae3/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 88906a7..1d30ea4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.rpc;
 
-import akka.dispatch.Futures;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.CompletableFuture;
@@ -210,9 +209,9 @@ public class TestingSerialRpcService implements RpcService {
                                if (returnType.equals(Future.class)) {
                                        try {
                                                Object result = 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
-                                               return 
Futures.successful(result);
+                                               return 
FlinkCompletableFuture.completed(result);
                                        } catch (Throwable e) {
-                                               return Futures.failed(e);
+                                               return 
FlinkCompletableFuture.completedExceptionally(e);
                                        }
                                } else {
                                        return 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);

Reply via email to