http://git-wip-us.apache.org/repos/asf/flink/blob/74570d45/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
deleted file mode 100644
index 52d9d06..0000000
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/SlotManagerTest.java
+++ /dev/null
@@ -1,538 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.clusterframework.types.AllocationID;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
-import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.taskexecutor.SlotStatus;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-public class SlotManagerTest {
-
-       private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
-
-       private static final long DEFAULT_TESTING_MEMORY = 512;
-
-       private static final ResourceProfile DEFAULT_TESTING_PROFILE =
-               new ResourceProfile(DEFAULT_TESTING_CPU_CORES, 
DEFAULT_TESTING_MEMORY);
-
-       private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
-               new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
-
-       private ResourceManagerGateway resourceManagerGateway;
-
-       @Before
-       public void setUp() {
-               resourceManagerGateway = mock(ResourceManagerGateway.class);
-       }
-
-       /**
-        * Tests that there are no free slots when we request, need to allocate 
from cluster manager master
-        */
-       @Test
-       public void testRequestSlotWithoutFreeSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-       }
-
-       /**
-        * Tests that there are some free slots when we request, and the 
request is fulfilled immediately
-        */
-       @Test
-       public void testRequestSlotWithFreeSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
-               assertEquals(1, slotManager.getFreeSlotCount());
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertEquals(0, slotManager.getAllocatedContainers().size());
-       }
-
-       /**
-        * Tests that there are some free slots when we request, but none of 
them are suitable
-        */
-       @Test
-       public void testRequestSlotWithoutSuitableSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
2);
-               assertEquals(2, slotManager.getFreeSlotCount());
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(2, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-       }
-
-       /**
-        * Tests that we send duplicated slot request
-        */
-       @Test
-       public void testDuplicatedSlotRequest() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
-
-               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
-
-               slotManager.requestSlot(request1);
-               slotManager.requestSlot(request2);
-               slotManager.requestSlot(request2);
-               slotManager.requestSlot(request1);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertEquals(1, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-       }
-
-       /**
-        * Tests that we send multiple slot requests
-        */
-       @Test
-       public void testRequestMultipleSlots() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
5);
-
-               // request 3 normal slots
-               for (int i = 0; i < 3; ++i) {
-                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_PROFILE));
-               }
-
-               // request 2 big slots
-               for (int i = 0; i < 2; ++i) {
-                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               }
-
-               // request 1 normal slot again
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(4, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(2, slotManager.getPendingRequestCount());
-               assertEquals(2, slotManager.getAllocatedContainers().size());
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
-               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(1));
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, and we used it to 
fulfill a pending request
-        */
-       @Test
-       public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               assertEquals(1, slotManager.getPendingRequestCount());
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, but we have no pending 
request
-        */
-       @Test
-       public void testNewlyAppearedFreeSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, but it't not suitable 
for all the pending requests
-        */
-       @Test
-       public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
-               assertEquals(1, slotManager.getPendingRequestCount());
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that a new slot appeared in SlotReport, and it's been reported 
using by some job
-        */
-       @Test
-       public void testNewlyAppearedInUseSlot() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that we had a slot in-use, and it's confirmed by SlotReport
-        */
-       @Test
-       public void testExistingInUseSlotUpdateStatus() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
-
-               // slot status is confirmed
-               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE,
-                       request.getAllocationId(), request.getJobId());
-               slotManager.updateSlotStatus(slotStatus2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
-       }
-
-       /**
-        * Tests that we had a slot in-use, but it's empty according to the 
SlotReport
-        */
-       @Test
-       public void testExistingInUseSlotAdjustedToEmpty() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request1);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               // another request pending
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request1.getAllocationId()));
-
-
-               // but slot is reported empty again, request2 will be 
fulfilled, request1 will be missing
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-       }
-
-       /**
-        * Tests that we had a slot in use, and it's also reported in use by 
TaskManager, but the allocation
-        * information didn't match.
-        */
-       @Test
-       public void testExistingInUseSlotWithDifferentAllocationInfo() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
-               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new AllocationID(), new JobID());
-               // update slot status with different allocation info
-               slotManager.updateSlotStatus(slotStatus2);
-
-               // original request is missing and won't be allocated
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               
assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
-       }
-
-       /**
-        * Tests that we had a free slot, and it's confirmed by SlotReport
-        */
-       @Test
-       public void testExistingEmptySlotUpdateStatus() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(0, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
-
-       /**
-        * Tests that we had a free slot, and it's reported in-use by 
TaskManager
-        */
-       @Test
-       public void testExistingEmptySlotAdjustedToInUse() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE,
-                       new AllocationID(), new JobID());
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slot.getSlotId()));
-       }
-
-       /**
-        * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
-        */
-       @Test
-       public void testSlotAllocationFailedAtTaskManager() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slot.getSlotId()));
-
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
-
-
-       /**
-        * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
-        */
-       @Test
-       public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.addFreeSlot(slot);
-
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // slot is set empty by heartbeat
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
slot.getResourceProfile());
-               slotManager.updateSlotStatus(slotStatus);
-
-               // another request took this slot
-               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-
-               // original request should be pended
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-       }
-
-       @Test
-       public void testNotifyTaskManagerFailure() {
-               TestingSlotManager slotManager = new 
TestingSlotManager(resourceManagerGateway);
-
-               ResourceID resource1 = ResourceID.generate();
-               ResourceID resource2 = ResourceID.generate();
-
-               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE);
-               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE);
-               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE);
-               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE);
-
-               slotManager.addFreeSlot(slot11);
-               slotManager.addFreeSlot(slot21);
-
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
-
-               assertEquals(2, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               slotManager.addFreeSlot(slot12);
-               slotManager.addFreeSlot(slot22);
-
-               assertEquals(2, slotManager.getAllocatedSlotCount());
-               assertEquals(2, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               slotManager.notifyTaskManagerFailure(resource2);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-
-               // notify an not exist resource failure
-               slotManager.notifyTaskManagerFailure(ResourceID.generate());
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(1, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-       }
-
-       // 
------------------------------------------------------------------------
-       //  testing utilities
-       // 
------------------------------------------------------------------------
-
-       private void directlyProvideFreeSlots(
-               final SlotManager slotManager,
-               final ResourceProfile resourceProfile,
-               final int freeSlotNum)
-       {
-               for (int i = 0; i < freeSlotNum; ++i) {
-                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile)));
-               }
-       }
-
-       // 
------------------------------------------------------------------------
-       //  testing classes
-       // 
------------------------------------------------------------------------
-
-       private static class TestingSlotManager extends SlotManager {
-
-               private final List<ResourceProfile> allocatedContainers;
-
-               TestingSlotManager(ResourceManagerGateway 
resourceManagerGateway) {
-                       super(resourceManagerGateway);
-                       this.allocatedContainers = new LinkedList<>();
-               }
-
-               /**
-                * Choose slot randomly if it matches requirement
-                *
-                * @param request   The slot request
-                * @param freeSlots All slots which can be used
-                * @return The chosen slot or null if cannot find a match
-                */
-               @Override
-               protected ResourceSlot chooseSlotToUse(SlotRequest request, 
Map<SlotID, ResourceSlot> freeSlots) {
-                       for (ResourceSlot slot : freeSlots.values()) {
-                               if 
(slot.isMatchingRequirement(request.getResourceProfile())) {
-                                       return slot;
-                               }
-                       }
-                       return null;
-               }
-
-               /**
-                * Choose request randomly if offered slot can match its 
requirement
-                *
-                * @param offeredSlot     The free slot
-                * @param pendingRequests All the pending slot requests
-                * @return The chosen request's AllocationID or null if cannot 
find a match
-                */
-               @Override
-               protected SlotRequest chooseRequestToFulfill(ResourceSlot 
offeredSlot,
-                       Map<AllocationID, SlotRequest> pendingRequests)
-               {
-                       for (Map.Entry<AllocationID, SlotRequest> 
pendingRequest : pendingRequests.entrySet()) {
-                               if 
(offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile()))
 {
-                                       return pendingRequest.getValue();
-                               }
-                       }
-                       return null;
-               }
-
-               @Override
-               protected void allocateContainer(ResourceProfile 
resourceProfile) {
-                       allocatedContainers.add(resourceProfile);
-               }
-
-               List<ResourceProfile> getAllocatedContainers() {
-                       return allocatedContainers;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/74570d45/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
new file mode 100644
index 0000000..9ee9690
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -0,0 +1,554 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+
+public class SlotManagerTest {
+
+       private static final double DEFAULT_TESTING_CPU_CORES = 1.0;
+
+       private static final long DEFAULT_TESTING_MEMORY = 512;
+
+       private static final ResourceProfile DEFAULT_TESTING_PROFILE =
+               new ResourceProfile(DEFAULT_TESTING_CPU_CORES, 
DEFAULT_TESTING_MEMORY);
+
+       private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
+               new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
+
+       private static TaskExecutorGateway taskExecutorGateway;
+
+       @BeforeClass
+       public static void setUp() {
+               taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
+       }
+
+       /**
+        * Tests that there are no free slots when we request, need to allocate 
from cluster manager master
+        */
+       @Test
+       public void testRequestSlotWithoutFreeSlot() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that there are some free slots when we request, and the 
request is fulfilled immediately
+        */
+       @Test
+       public void testRequestSlotWithFreeSlot() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
+               assertEquals(1, slotManager.getFreeSlotCount());
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertEquals(0, slotManager.getAllocatedContainers().size());
+       }
+
+       /**
+        * Tests that there are some free slots when we request, but none of 
them are suitable
+        */
+       @Test
+       public void testRequestSlotWithoutSuitableSlot() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
2);
+               assertEquals(2, slotManager.getFreeSlotCount());
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(2, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that we send duplicated slot request
+        */
+       @Test
+       public void testDuplicatedSlotRequest() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
1);
+
+               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE);
+
+               slotManager.requestSlot(request1);
+               slotManager.requestSlot(request2);
+               slotManager.requestSlot(request2);
+               slotManager.requestSlot(request1);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(1, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+       }
+
+       /**
+        * Tests that we send multiple slot requests
+        */
+       @Test
+       public void testRequestMultipleSlots() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               directlyProvideFreeSlots(slotManager, DEFAULT_TESTING_PROFILE, 
5);
+
+               // request 3 normal slots
+               for (int i = 0; i < 3; ++i) {
+                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_PROFILE));
+               }
+
+               // request 2 big slots
+               for (int i = 0; i < 2; ++i) {
+                       slotManager.requestSlot(new SlotRequest(new JobID(), 
new AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               }
+
+               // request 1 normal slot again
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(4, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(2, slotManager.getPendingRequestCount());
+               assertEquals(2, slotManager.getAllocatedContainers().size());
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(0));
+               assertEquals(DEFAULT_TESTING_BIG_PROFILE, 
slotManager.getAllocatedContainers().get(1));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, and we used it to 
fulfill a pending request
+        */
+       @Test
+       public void testNewlyAppearedFreeSlotFulfillPendingRequest() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               assertEquals(1, slotManager.getPendingRequestCount());
+
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, but we have no pending 
request
+        */
+       @Test
+       public void testNewlyAppearedFreeSlot() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, but it't not suitable 
for all the pending requests
+        */
+       @Test
+       public void testNewlyAppearedFreeSlotNotMatchPendingRequests() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_BIG_PROFILE));
+               assertEquals(1, slotManager.getPendingRequestCount());
+
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that a new slot appeared in SlotReport, and it's been reported 
using by some job
+        */
+       @Test
+       public void testNewlyAppearedInUseSlot() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that we had a slot in-use, and it's confirmed by SlotReport
+        */
+       @Test
+       public void testExistingInUseSlotUpdateStatus() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+
+               // slot status is confirmed
+               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE,
+                       request.getJobId(), request.getAllocationId());
+               slotManager.updateSlotStatus(slotStatus2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertTrue(slotManager.isAllocated(slotId));
+       }
+
+       /**
+        * Tests that we had a slot in-use, but it's empty according to the 
SlotReport
+        */
+       @Test
+       public void testExistingInUseSlotAdjustedToEmpty() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request1);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               // another request pending
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request1.getAllocationId()));
+
+
+               // but slot is reported empty again, request2 will be 
fulfilled, request1 will be missing
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       }
+
+       /**
+        * Tests that we had a slot in use, and it's also reported in use by 
TaskManager, but the allocation
+        * information didn't match.
+        */
+       @Test
+       public void testExistingInUseSlotWithDifferentAllocationInfo() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // make this slot in use
+               SlotID slotId = SlotID.generate();
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(request.getAllocationId()));
+
+               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+               // update slot status with different allocation info
+               slotManager.updateSlotStatus(slotStatus2);
+
+               // original request is missing and won't be allocated
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slotId));
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               
assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
+       }
+
+       /**
+        * Tests that we had a free slot, and it's confirmed by SlotReport
+        */
+       @Test
+       public void testExistingEmptySlotUpdateStatus() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               slotManager.addFreeSlot(slot);
+
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE);
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+       /**
+        * Tests that we had a free slot, and it's reported in-use by 
TaskManager
+        */
+       @Test
+       public void testExistingEmptySlotAdjustedToInUse() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               final SlotID slotID = SlotID.generate();
+               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorGateway);
+
+               ResourceSlot slot = new ResourceSlot(slotID, 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               slotManager.addFreeSlot(slot);
+
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE,
+                       new JobID(), new AllocationID());
+               slotManager.updateSlotStatus(slotStatus);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+       }
+
+       /**
+        * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
+        */
+       @Test
+       public void testSlotAllocationFailedAtTaskManager() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               slotManager.addFreeSlot(slot);
+
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertTrue(slotManager.isAllocated(slot.getSlotId()));
+
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+
+       /**
+        * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
+        */
+       @Test
+       public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+               final SlotID slotID = SlotID.generate();
+               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorGateway);
+
+               ResourceSlot slot = new ResourceSlot(slotID, 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               slotManager.addFreeSlot(slot);
+
+               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request);
+
+               // slot is set empty by heartbeat
+               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
slot.getResourceProfile());
+               slotManager.updateSlotStatus(slotStatus);
+
+               // another request took this slot
+               SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               slotManager.requestSlot(request2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+
+               // original request should be pended
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(1, slotManager.getPendingRequestCount());
+               assertFalse(slotManager.isAllocated(request.getAllocationId()));
+               assertTrue(slotManager.isAllocated(request2.getAllocationId()));
+       }
+
+       @Test
+       public void testNotifyTaskManagerFailure() {
+               TestingSlotManager slotManager = new TestingSlotManager();
+
+               ResourceID resource1 = ResourceID.generate();
+               ResourceID resource2 = ResourceID.generate();
+
+               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+
+               slotManager.addFreeSlot(slot11);
+               slotManager.addFreeSlot(slot21);
+
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+               slotManager.requestSlot(new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE));
+
+               assertEquals(2, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               slotManager.addFreeSlot(slot12);
+               slotManager.addFreeSlot(slot22);
+
+               assertEquals(2, slotManager.getAllocatedSlotCount());
+               assertEquals(2, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               slotManager.notifyTaskManagerFailure(resource2);
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               // notify an not exist resource failure
+               slotManager.notifyTaskManagerFailure(ResourceID.generate());
+
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing utilities
+       // 
------------------------------------------------------------------------
+
+       private void directlyProvideFreeSlots(
+               final SlotManager slotManager,
+               final ResourceProfile resourceProfile,
+               final int freeSlotNum)
+       {
+               for (int i = 0; i < freeSlotNum; ++i) {
+                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), 
taskExecutorGateway));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  testing classes
+       // 
------------------------------------------------------------------------
+
+       private static class TestingSlotManager extends SlotManager {
+
+               private final List<ResourceProfile> allocatedContainers;
+
+               TestingSlotManager() {
+                       this.allocatedContainers = new LinkedList<>();
+               }
+
+               /**
+                * Choose slot randomly if it matches requirement
+                *
+                * @param request   The slot request
+                * @param freeSlots All slots which can be used
+                * @return The chosen slot or null if cannot find a match
+                */
+               @Override
+               protected ResourceSlot chooseSlotToUse(SlotRequest request, 
Map<SlotID, ResourceSlot> freeSlots) {
+                       for (ResourceSlot slot : freeSlots.values()) {
+                               if 
(slot.isMatchingRequirement(request.getResourceProfile())) {
+                                       return slot;
+                               }
+                       }
+                       return null;
+               }
+
+               /**
+                * Choose request randomly if offered slot can match its 
requirement
+                *
+                * @param offeredSlot     The free slot
+                * @param pendingRequests All the pending slot requests
+                * @return The chosen request's AllocationID or null if cannot 
find a match
+                */
+               @Override
+               protected SlotRequest chooseRequestToFulfill(ResourceSlot 
offeredSlot,
+                       Map<AllocationID, SlotRequest> pendingRequests)
+               {
+                       for (Map.Entry<AllocationID, SlotRequest> 
pendingRequest : pendingRequests.entrySet()) {
+                               if 
(offeredSlot.isMatchingRequirement(pendingRequest.getValue().getResourceProfile()))
 {
+                                       return pendingRequest.getValue();
+                               }
+                       }
+                       return null;
+               }
+
+               @Override
+               protected void allocateContainer(ResourceProfile 
resourceProfile) {
+                       allocatedContainers.add(resourceProfile);
+               }
+
+               List<ResourceProfile> getAllocatedContainers() {
+                       return allocatedContainers;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74570d45/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
new file mode 100644
index 0000000..85d2880
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager.slotmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.highavailability.NonHaServices;
+import org.apache.flink.runtime.jobmaster.JobMasterGateway;
+import org.apache.flink.runtime.resourcemanager.JobMasterRegistration;
+import org.apache.flink.runtime.resourcemanager.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import org.apache.flink.runtime.rpc.TestingSerialRpcService;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.SlotStatus;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Mockito;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Duration;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+
+public class SlotProtocolTest extends TestLogger {
+
+       private static TestingSerialRpcService testRpcService;
+
+       @BeforeClass
+       public static void beforeClass() {
+               testRpcService = new TestingSerialRpcService();
+       }
+
+       @AfterClass
+       public static void afterClass() {
+               testRpcService.stopService();
+               testRpcService = null;
+       }
+
+       @Before
+       public void beforeTest(){
+               testRpcService.clearGateways();
+       }
+
+       /**
+        * Tests whether
+        * 1) SlotRequest is routed to the SlotManager
+        * 2) SlotRequest is confirmed
+        * 3) SlotRequest leads to a container allocation
+        * 4) Slot becomes available and TaskExecutor gets a SlotRequest
+        */
+       @Test
+       public void testSlotsUnavailableRequest() throws Exception {
+               final String rmAddress = "/rm1";
+               final String jmAddress = "/jm1";
+               final JobID jobID = new JobID();
+
+               testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+
+               TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+               ResourceManager resourceManager =
+                       new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+               resourceManager.start();
+
+               Future<RegistrationResponse> registrationFuture =
+                       resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+               try {
+                       Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+               } catch (Exception e) {
+                       Assert.fail("JobManager registration Future didn't 
become ready.");
+               }
+
+               final AllocationID allocationID = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
+
+               SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
+               SlotRequestReply slotRequestReply =
+                       resourceManager.requestSlot(slotRequest);
+
+               // 1) SlotRequest is routed to the SlotManager
+               verify(slotManager).requestSlot(slotRequest);
+
+               // 2) SlotRequest is confirmed
+               Assert.assertEquals(
+                       slotRequestReply.getAllocationID(),
+                       allocationID);
+
+               // 3) SlotRequest leads to a container allocation
+               verify(slotManager, 
timeout(5000)).allocateContainer(resourceProfile);
+
+               Assert.assertFalse(slotManager.isAllocated(allocationID));
+
+               // slot becomes available
+               final String tmAddress = "/tm1";
+               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+               final ResourceID resourceID = ResourceID.generate();
+               final SlotID slotID = new SlotID(resourceID, 0);
+
+               final SlotStatus slotStatus =
+                       new SlotStatus(slotID, resourceProfile);
+               final SlotReport slotReport =
+                       new SlotReport(Collections.singletonList(slotStatus), 
resourceID);
+               // register slot at SlotManager
+               slotManager.registerTaskExecutor(resourceID, 
taskExecutorGateway);
+               slotManager.updateSlotStatus(slotReport);
+
+               // 4) Slot becomes available and TaskExecutor gets a SlotRequest
+               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), 
any(FiniteDuration.class));
+       }
+
+       /**
+        * Tests whether
+        * 1) a SlotRequest is routed to the SlotManager
+        * 2) a SlotRequest is confirmed
+        * 3) a SlotRequest leads to an allocation of a registered slot
+        * 4) a SlotRequest is routed to the TaskExecutor
+        */
+       @Test
+       public void testSlotAvailableRequest() throws Exception {
+               final String rmAddress = "/rm1";
+               final String jmAddress = "/jm1";
+               final String tmAddress = "/tm1";
+               final JobID jobID = new JobID();
+
+               testRpcService.registerGateway(jmAddress, 
mock(JobMasterGateway.class));
+
+               TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
+               testRpcService.registerGateway(tmAddress, taskExecutorGateway);
+
+               TestingSlotManager slotManager = Mockito.spy(new 
TestingSlotManager());
+               ResourceManager resourceManager =
+                       new ResourceManager(testRpcService, new 
NonHaServices(rmAddress), slotManager);
+               resourceManager.start();
+
+               Future<RegistrationResponse> registrationFuture =
+                       resourceManager.registerJobMaster(new 
JobMasterRegistration(jmAddress, jobID));
+               try {
+                       Await.ready(registrationFuture, Duration.create(5, 
TimeUnit.SECONDS));
+               } catch (Exception e) {
+                       Assert.fail("JobManager registration Future didn't 
become ready.");
+               }
+
+               final ResourceID resourceID = ResourceID.generate();
+               final AllocationID allocationID = new AllocationID();
+               final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
+               final SlotID slotID = new SlotID(resourceID, 0);
+
+               final SlotStatus slotStatus =
+                       new SlotStatus(slotID, resourceProfile);
+               final SlotReport slotReport =
+                       new SlotReport(Collections.singletonList(slotStatus), 
resourceID);
+               // register slot at SlotManager
+               slotManager.registerTaskExecutor(resourceID, 
taskExecutorGateway);
+               slotManager.updateSlotStatus(slotReport);
+
+               SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
+               SlotRequestReply slotRequestReply =
+                       resourceManager.requestSlot(slotRequest);
+
+               // 1) a SlotRequest is routed to the SlotManager
+               verify(slotManager).requestSlot(slotRequest);
+
+               // 2) a SlotRequest is confirmed
+               Assert.assertEquals(
+                       slotRequestReply.getAllocationID(),
+                       allocationID);
+
+               // 3) a SlotRequest leads to an allocation of a registered slot
+               Assert.assertTrue(slotManager.isAllocated(slotID));
+               Assert.assertTrue(slotManager.isAllocated(allocationID));
+
+
+               // 4) a SlotRequest is routed to the TaskExecutor
+               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), 
any(FiniteDuration.class));
+       }
+
+
+       private static class TestingSlotManager extends SimpleSlotManager {
+
+               // change visibility of function to public for testing
+               @Override
+               public void allocateContainer(ResourceProfile resourceProfile) {
+                       super.allocateContainer(resourceProfile);
+               }
+
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74570d45/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 7e92e8d..2212680 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -112,4 +112,8 @@ public class TestingRpcService extends AkkaRpcService {
                        return Futures.failed(new Exception("No gateway 
registered under that name"));
                }
        }
-}
\ No newline at end of file
+
+       public void clearGateways() {
+               registeredConnections.clear();
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/74570d45/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 955edcc..01776ed 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -137,6 +137,10 @@ public class TestingSerialRpcService implements RpcService 
{
                }
        }
 
+       public void clearGateways() {
+               registeredConnections.clear();
+       }
+
        private static final class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutor, StartStoppable {
 
                private final T rpcEndpoint;

Reply via email to