http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
index 8f09152..14afd0e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java
@@ -25,10 +25,8 @@ import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -160,7 +158,7 @@ public class ResourceManagerJobMasterTest {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(resourceManagerLeaderElectionService);
                highAvailabilityServices.setJobMasterLeaderRetriever(jobID, 
jobMasterLeaderRetrievalService);
-               ResourceManager resourceManager = new 
StandaloneResourceManager(rpcService, highAvailabilityServices, new 
SimpleSlotManager());
+               ResourceManager resourceManager = new 
TestingResourceManager(rpcService, highAvailabilityServices);
                resourceManager.start();
                return resourceManager;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index e6d1ed5..a577c26 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -22,10 +22,9 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.Future;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
-import org.apache.flink.runtime.resourcemanager.slotmanager.SimpleSlotManager;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.junit.After;
@@ -44,9 +43,24 @@ public class ResourceManagerTaskExecutorTest {
 
        private TestingSerialRpcService rpcService;
 
+       private SlotReport slotReport = new SlotReport();
+
+       private static String taskExecutorAddress = "/taskExecutor1";
+
+       private ResourceID taskExecutorResourceID;
+
+       private StandaloneResourceManager resourceManager;
+
+       private UUID leaderSessionId;
+
        @Before
        public void setup() throws Exception {
                rpcService = new TestingSerialRpcService();
+
+               taskExecutorResourceID = mockTaskExecutor(taskExecutorAddress);
+               TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
+               resourceManager = 
createAndStartResourceManager(rmLeaderElectionService);
+               leaderSessionId = grantLeadership(rmLeaderElectionService);
        }
 
        @After
@@ -59,19 +73,15 @@ public class ResourceManagerTaskExecutorTest {
         */
        @Test
        public void testRegisterTaskExecutor() throws Exception {
-               String taskExecutorAddress = "/taskExecutor1";
-               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
-               TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(rmLeaderElectionService);
-               final UUID leaderSessionId = 
grantLeadership(rmLeaderElectionService);
-
                // test response successful
-               Future<RegistrationResponse> successfulFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
+               Future<RegistrationResponse> successfulFuture =
+                       resourceManager.registerTaskExecutor(leaderSessionId, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
                RegistrationResponse response = successfulFuture.get(5, 
TimeUnit.SECONDS);
                assertTrue(response instanceof TaskExecutorRegistrationSuccess);
 
                // test response successful with instanceID not equal to 
previous when receive duplicate registration from taskExecutor
-               Future<RegistrationResponse> duplicateFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, taskExecutorAddress, 
taskExecutorResourceID);
+               Future<RegistrationResponse> duplicateFuture =
+                       resourceManager.registerTaskExecutor(leaderSessionId, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
                RegistrationResponse duplicateResponse = duplicateFuture.get();
                assertTrue(duplicateResponse instanceof 
TaskExecutorRegistrationSuccess);
                assertNotEquals(((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), ((TaskExecutorRegistrationSuccess) 
duplicateResponse).getRegistrationId());
@@ -82,15 +92,10 @@ public class ResourceManagerTaskExecutorTest {
         */
        @Test
        public void testRegisterTaskExecutorWithUnmatchedLeaderSessionId() 
throws Exception {
-               String taskExecutorAddress = "/taskExecutor1";
-               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
-               TestingLeaderElectionService rmLeaderElectionService = new 
TestingLeaderElectionService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(rmLeaderElectionService);
-               final UUID leaderSessionId = 
grantLeadership(rmLeaderElectionService);
-
                // test throw exception when receive a registration from 
taskExecutor which takes unmatched leaderSessionId
                UUID differentLeaderSessionID = UUID.randomUUID();
-               Future<RegistrationResponse> unMatchedLeaderFuture = 
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID);
+               Future<RegistrationResponse> unMatchedLeaderFuture =
+                       
resourceManager.registerTaskExecutor(differentLeaderSessionID, 
taskExecutorAddress, taskExecutorResourceID, slotReport);
                assertTrue(unMatchedLeaderFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
        }
 
@@ -99,15 +104,10 @@ public class ResourceManagerTaskExecutorTest {
         */
        @Test
        public void testRegisterTaskExecutorFromInvalidAddress() throws 
Exception {
-               String taskExecutorAddress = "/taskExecutor1";
-               ResourceID taskExecutorResourceID = 
mockTaskExecutor(taskExecutorAddress);
-               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
-               final ResourceManager resourceManager = 
createAndStartResourceManager(leaderElectionService);
-               final UUID leaderSessionId = 
grantLeadership(leaderElectionService);
-
                // test throw exception when receive a registration from 
taskExecutor which takes invalid address
                String invalidAddress = "/taskExecutor2";
-               Future<RegistrationResponse> invalidAddressFuture = 
resourceManager.registerTaskExecutor(leaderSessionId, invalidAddress, 
taskExecutorResourceID);
+               Future<RegistrationResponse> invalidAddressFuture =
+                       resourceManager.registerTaskExecutor(leaderSessionId, 
invalidAddress, taskExecutorResourceID, slotReport);
                assertTrue(invalidAddressFuture.get(5, TimeUnit.SECONDS) 
instanceof RegistrationResponse.Decline);
        }
 
@@ -118,10 +118,11 @@ public class ResourceManagerTaskExecutorTest {
                return taskExecutorResourceID;
        }
 
-       private ResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService) {
+       private StandaloneResourceManager 
createAndStartResourceManager(TestingLeaderElectionService 
rmLeaderElectionService) {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
-               ResourceManager resourceManager = new 
StandaloneResourceManager(rpcService, highAvailabilityServices, new 
SimpleSlotManager());
+               StandaloneResourceManager resourceManager =
+                       new TestingResourceManager(rpcService, 
highAvailabilityServices);
                resourceManager.start();
                return resourceManager;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
new file mode 100644
index 0000000..6b4ca14
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingResourceManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory;
+import org.apache.flink.runtime.rpc.RpcService;
+
+public class TestingResourceManager extends StandaloneResourceManager {
+
+       public TestingResourceManager(RpcService rpcService) {
+               this(rpcService, new TestingHighAvailabilityServices());
+       }
+
+       public TestingResourceManager(
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices) {
+               this(rpcService, highAvailabilityServices, new 
TestingSlotManagerFactory());
+       }
+
+       public TestingResourceManager(
+                       RpcService rpcService,
+                       HighAvailabilityServices highAvailabilityServices,
+                       SlotManagerFactory slotManagerFactory) {
+               super(rpcService, highAvailabilityServices, slotManagerFactory);
+       }
+
+       private static class TestingSlotManagerFactory implements 
SlotManagerFactory {
+
+               @Override
+               public SlotManager create(ResourceManagerServices rmServices) {
+                       return new TestingSlotManager(rmServices);
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
new file mode 100644
index 0000000..0b2c42b
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/TestingSlotManager.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.ResourceSlot;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.mockito.Mockito;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.Executor;
+
+public class TestingSlotManager extends SlotManager {
+
+       public TestingSlotManager() {
+               this(new TestingResourceManagerServices());
+       }
+
+       public TestingSlotManager(ResourceManagerServices rmServices) {
+               super(rmServices);
+       }
+
+       @Override
+       protected ResourceSlot chooseSlotToUse(SlotRequest request, Map<SlotID, 
ResourceSlot> freeSlots) {
+               final Iterator<ResourceSlot> slotIterator = 
freeSlots.values().iterator();
+               if (slotIterator.hasNext()) {
+                       return slotIterator.next();
+               } else {
+                       return null;
+               }
+       }
+
+       @Override
+       protected SlotRequest chooseRequestToFulfill(ResourceSlot offeredSlot, 
Map<AllocationID, SlotRequest> pendingRequests) {
+               final Iterator<SlotRequest> requestIterator = 
pendingRequests.values().iterator();
+               if (requestIterator.hasNext()) {
+                       return requestIterator.next();
+               } else {
+                       return null;
+               }
+       }
+
+       private static class TestingResourceManagerServices implements 
ResourceManagerServices {
+
+               @Override
+               public void allocateResource(ResourceProfile resourceProfile) {
+
+               }
+
+               @Override
+               public Executor getAsyncExecutor() {
+                       return Mockito.mock(Executor.class);
+               }
+
+               @Override
+               public Executor getMainThreadExecutor() {
+                       return Mockito.mock(Executor.class);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 0fed79e..0d2b40d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -28,13 +28,16 @@ import 
org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
-import org.apache.flink.runtime.resourcemanager.SlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -45,7 +48,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
 
 public class SlotManagerTest {
 
@@ -59,13 +61,15 @@ public class SlotManagerTest {
        private static final ResourceProfile DEFAULT_TESTING_BIG_PROFILE =
                new ResourceProfile(DEFAULT_TESTING_CPU_CORES * 2, 
DEFAULT_TESTING_MEMORY * 2);
 
-       private static TaskExecutorGateway taskExecutorGateway;
+       private static TaskExecutorRegistration taskExecutorRegistration;
 
        @BeforeClass
        public static void setUp() {
-               taskExecutorGateway = Mockito.mock(TaskExecutorGateway.class);
-               
Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), 
any(UUID.class), any(Time.class)))
-                       .thenReturn(new 
FlinkCompletableFuture<SlotRequestReply>());
+               taskExecutorRegistration = 
Mockito.mock(TaskExecutorRegistration.class);
+               TaskExecutorGateway gateway = 
Mockito.mock(TaskExecutorGateway.class);
+               
Mockito.when(taskExecutorRegistration.getTaskExecutorGateway()).thenReturn(gateway);
+               Mockito.when(gateway.requestSlot(any(SlotID.class), 
any(AllocationID.class), any(UUID.class), any(Time.class)))
+                       .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
        }
 
        /**
@@ -180,9 +184,9 @@ public class SlotManagerTest {
                assertEquals(1, slotManager.getPendingRequestCount());
 
                SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
                SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
+               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
 
                assertEquals(1, slotManager.getAllocatedSlotCount());
                assertEquals(0, slotManager.getFreeSlotCount());
@@ -198,9 +202,9 @@ public class SlotManagerTest {
                TestingSlotManager slotManager = new TestingSlotManager();
 
                SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
                SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
+               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
 
                assertEquals(0, slotManager.getAllocatedSlotCount());
                assertEquals(1, slotManager.getFreeSlotCount());
@@ -216,9 +220,9 @@ public class SlotManagerTest {
                assertEquals(1, slotManager.getPendingRequestCount());
 
                SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
                SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
+               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
 
                assertEquals(0, slotManager.getAllocatedSlotCount());
                assertEquals(1, slotManager.getFreeSlotCount());
@@ -234,9 +238,9 @@ public class SlotManagerTest {
                TestingSlotManager slotManager = new TestingSlotManager();
 
                SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
                SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-               slotManager.updateSlotStatus(slotStatus);
+               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
 
                assertEquals(1, slotManager.getAllocatedSlotCount());
                assertEquals(0, slotManager.getFreeSlotCount());
@@ -244,48 +248,44 @@ public class SlotManagerTest {
        }
 
        /**
-        * Tests that we had a slot in-use, and it's confirmed by SlotReport
+        * Tests that we had a slot in-use and is freed again subsequently.
         */
        @Test
        public void testExistingInUseSlotUpdateStatus() {
                TestingSlotManager slotManager = new TestingSlotManager();
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
 
-               // make this slot in use
                SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
+               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
+               SlotReport slotReport = new 
SlotReport(Collections.singletonList(slotStatus));
+               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorRegistration, slotReport);
 
                assertEquals(1, slotManager.getAllocatedSlotCount());
                assertEquals(0, slotManager.getFreeSlotCount());
                assertTrue(slotManager.isAllocated(slotId));
 
-               // slot status is confirmed
-               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE,
-                       request.getJobId(), request.getAllocationId());
-               slotManager.updateSlotStatus(slotStatus2);
+               // slot is freed again
+               slotManager.notifySlotAvailable(slotId.getResourceID(), slotId);
 
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertTrue(slotManager.isAllocated(slotId));
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertFalse(slotManager.isAllocated(slotId));
        }
 
        /**
-        * Tests that we had a slot in-use, but it's empty according to the 
SlotReport
+        * Tests multiple slot requests with one slots.
         */
        @Test
-       public void testExistingInUseSlotAdjustedToEmpty() {
+       public void testMultipleSlotRequestsWithOneSlot() {
                TestingSlotManager slotManager = new TestingSlotManager();
-               SlotRequest request1 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
+               final AllocationID allocationID = new AllocationID();
+
+               SlotRequest request1 = new SlotRequest(new JobID(), 
allocationID, DEFAULT_TESTING_PROFILE);
                slotManager.requestSlot(request1);
 
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
+               final ResourceID resourceID = ResourceID.generate();
+               final SlotStatus slotStatus = new SlotStatus(new 
SlotID(resourceID, 0), DEFAULT_TESTING_PROFILE);
+               final SlotReport slotReport = new SlotReport(slotStatus);
+               slotManager.registerTaskExecutor(resourceID, 
taskExecutorRegistration, slotReport);
 
                // another request pending
                SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
@@ -294,66 +294,20 @@ public class SlotManagerTest {
                assertEquals(1, slotManager.getAllocatedSlotCount());
                assertEquals(0, slotManager.getFreeSlotCount());
                assertEquals(1, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(allocationID));
                assertTrue(slotManager.isAllocated(request1.getAllocationId()));
 
-
-               // but slot is reported empty again, request2 will be 
fulfilled, request1 will be missing
-               slotManager.updateSlotStatus(slotStatus);
+               // but slot is reported empty in a report in the meantime which 
shouldn't affect the state
+               slotManager.notifySlotAvailable(resourceID, 
slotStatus.getSlotID());
 
                assertEquals(1, slotManager.getAllocatedSlotCount());
                assertEquals(0, slotManager.getFreeSlotCount());
                assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
+               assertTrue(slotManager.isAllocated(slotStatus.getSlotID()));
                assertTrue(slotManager.isAllocated(request2.getAllocationId()));
-       }
-
-       /**
-        * Tests that we had a slot in use, and it's also reported in use by 
TaskManager, but the allocation
-        * information didn't match.
-        */
-       @Test
-       public void testExistingInUseSlotWithDifferentAllocationInfo() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
-               slotManager.requestSlot(request);
-
-               // make this slot in use
-               SlotID slotId = SlotID.generate();
-               slotManager.registerTaskExecutor(slotId.getResourceID(), 
taskExecutorGateway);
-               SlotStatus slotStatus = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertTrue(slotManager.isAllocated(request.getAllocationId()));
-
-               SlotStatus slotStatus2 = new SlotStatus(slotId, 
DEFAULT_TESTING_PROFILE, new JobID(), new AllocationID());
-               // update slot status with different allocation info
-               slotManager.updateSlotStatus(slotStatus2);
-
-               // original request is missing and won't be allocated
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slotId));
-               assertFalse(slotManager.isAllocated(request.getAllocationId()));
-               
assertTrue(slotManager.isAllocated(slotStatus2.getAllocationID()));
-       }
-
-       /**
-        * Tests that we had a free slot, and it's confirmed by SlotReport
-        */
-       @Test
-       public void testExistingEmptySlotUpdateStatus() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-               slotManager.addFreeSlot(slot);
 
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE);
-               slotManager.updateSlotStatus(slotStatus);
+               // but slot is reported empty in a report in the meantime which 
shouldn't affect the state
+               slotManager.notifySlotAvailable(resourceID, 
slotStatus.getSlotID());
 
                assertEquals(0, slotManager.getAllocatedSlotCount());
                assertEquals(1, slotManager.getFreeSlotCount());
@@ -361,34 +315,12 @@ public class SlotManagerTest {
        }
 
        /**
-        * Tests that we had a free slot, and it's reported in-use by 
TaskManager
-        */
-       @Test
-       public void testExistingEmptySlotAdjustedToInUse() {
-               TestingSlotManager slotManager = new TestingSlotManager();
-               final SlotID slotID = SlotID.generate();
-               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorGateway);
-
-               ResourceSlot slot = new ResourceSlot(slotID, 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-               slotManager.addFreeSlot(slot);
-
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
DEFAULT_TESTING_PROFILE,
-                       new JobID(), new AllocationID());
-               slotManager.updateSlotStatus(slotStatus);
-
-               assertEquals(1, slotManager.getAllocatedSlotCount());
-               assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(0, slotManager.getPendingRequestCount());
-               assertTrue(slotManager.isAllocated(slot.getSlotId()));
-       }
-
-       /**
         * Tests that we did some allocation but failed / rejected by 
TaskManager, request will retry
         */
        @Test
        public void testSlotAllocationFailedAtTaskManager() {
                TestingSlotManager slotManager = new TestingSlotManager();
-               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               ResourceSlot slot = new ResourceSlot(SlotID.generate(), 
DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
                slotManager.addFreeSlot(slot);
 
                SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
@@ -409,24 +341,31 @@ public class SlotManagerTest {
 
        /**
         * Tests that we did some allocation but failed / rejected by 
TaskManager, and slot is occupied by another request
+        * This can only occur after reconnect of the TaskExecutor.
         */
        @Test
        public void testSlotAllocationFailedAtTaskManagerOccupiedByOther() {
                TestingSlotManager slotManager = new TestingSlotManager();
                final SlotID slotID = SlotID.generate();
-               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorGateway);
-
-               ResourceSlot slot = new ResourceSlot(slotID, 
DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-               slotManager.addFreeSlot(slot);
+               SlotStatus slot = new SlotStatus(slotID, 
DEFAULT_TESTING_PROFILE);
+               SlotReport slotReport = new SlotReport(slot);
+               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorRegistration, slotReport);
 
                SlotRequest request = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
                slotManager.requestSlot(request);
 
-               // slot is set empty by heartbeat
-               SlotStatus slotStatus = new SlotStatus(slot.getSlotId(), 
slot.getResourceProfile());
-               slotManager.updateSlotStatus(slotStatus);
+               assertEquals(1, slotManager.getAllocatedSlotCount());
+               assertEquals(0, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
+
+               // slot is set empty by a reconnect of the TaskExecutor
+               slotManager.registerTaskExecutor(slotID.getResourceID(), 
taskExecutorRegistration, slotReport);
+
+               assertEquals(0, slotManager.getAllocatedSlotCount());
+               assertEquals(1, slotManager.getFreeSlotCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
 
-               // another request took this slot
+               // another request takes the slot
                SlotRequest request2 = new SlotRequest(new JobID(), new 
AllocationID(), DEFAULT_TESTING_PROFILE);
                slotManager.requestSlot(request2);
 
@@ -436,12 +375,12 @@ public class SlotManagerTest {
                assertFalse(slotManager.isAllocated(request.getAllocationId()));
                assertTrue(slotManager.isAllocated(request2.getAllocationId()));
 
-               // original request should be pended
-               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slot.getSlotId());
+               // original request should be retried
+               slotManager.handleSlotRequestFailedAtTaskManager(request, 
slotID);
 
                assertEquals(1, slotManager.getAllocatedSlotCount());
                assertEquals(0, slotManager.getFreeSlotCount());
-               assertEquals(1, slotManager.getPendingRequestCount());
+               assertEquals(0, slotManager.getPendingRequestCount());
                assertFalse(slotManager.isAllocated(request.getAllocationId()));
                assertTrue(slotManager.isAllocated(request2.getAllocationId()));
        }
@@ -453,10 +392,10 @@ public class SlotManagerTest {
                ResourceID resource1 = ResourceID.generate();
                ResourceID resource2 = ResourceID.generate();
 
-               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
-               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE, taskExecutorGateway);
+               ResourceSlot slot11 = new ResourceSlot(new SlotID(resource1, 
1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+               ResourceSlot slot12 = new ResourceSlot(new SlotID(resource1, 
2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+               ResourceSlot slot21 = new ResourceSlot(new SlotID(resource2, 
1), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
+               ResourceSlot slot22 = new ResourceSlot(new SlotID(resource2, 
2), DEFAULT_TESTING_PROFILE, taskExecutorRegistration);
 
                slotManager.addFreeSlot(slot11);
                slotManager.addFreeSlot(slot21);
@@ -499,7 +438,7 @@ public class SlotManagerTest {
                final int freeSlotNum)
        {
                for (int i = 0; i < freeSlotNum; ++i) {
-                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), 
taskExecutorGateway));
+                       slotManager.addFreeSlot(new 
ResourceSlot(SlotID.generate(), new ResourceProfile(resourceProfile), 
taskExecutorRegistration));
                }
        }
 
@@ -507,13 +446,13 @@ public class SlotManagerTest {
        //  testing classes
        // 
------------------------------------------------------------------------
 
-       private static class TestingSlotManager extends SlotManager implements 
ResourceManagerServices {
+       private static class TestingSlotManager extends SlotManager {
 
-               private final List<ResourceProfile> allocatedContainers;
+               private static TestingRmServices testingRmServices = new 
TestingRmServices();
 
                TestingSlotManager() {
-                       this.allocatedContainers = new LinkedList<>();
-                       setupResourceManagerServices(this);
+                       super(testingRmServices);
+                       testingRmServices.allocatedContainers.clear();
                }
 
                /**
@@ -552,24 +491,34 @@ public class SlotManagerTest {
                        return null;
                }
 
-               @Override
-               public void allocateResource(ResourceProfile resourceProfile) {
-                       allocatedContainers.add(resourceProfile);
+               List<ResourceProfile> getAllocatedContainers() {
+                       return testingRmServices.allocatedContainers;
                }
 
-               @Override
-               public Executor getAsyncExecutor() {
-                       return Mockito.mock(Executor.class);
-               }
 
-               @Override
-               public Executor getExecutor() {
-                       return Mockito.mock(Executor.class);
-               }
+               private static class TestingRmServices implements 
ResourceManagerServices {
 
-               List<ResourceProfile> getAllocatedContainers() {
-                       return allocatedContainers;
-               }
+                       private List<ResourceProfile> allocatedContainers;
+
+                       public TestingRmServices() {
+                               this.allocatedContainers = new LinkedList<>();
+                       }
+
+                       @Override
+                       public void allocateResource(ResourceProfile 
resourceProfile) {
+                               allocatedContainers.add(resourceProfile);
+                       }
 
+                       @Override
+                       public Executor getAsyncExecutor() {
+                               return Mockito.mock(Executor.class);
+                       }
+
+                       @Override
+                       public Executor getMainThreadExecutor() {
+                               return Mockito.mock(Executor.class);
+                       }
+
+               }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
index a87fe42..24d959e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java
@@ -25,12 +25,20 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.jobmaster.JobMasterGateway;
 import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
 import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.registration.RegistrationResponse;
-import org.apache.flink.runtime.resourcemanager.*;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerServices;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.resourcemanager.TestingResourceManager;
+import org.apache.flink.runtime.resourcemanager.TestingSlotManager;
+import 
org.apache.flink.runtime.resourcemanager.messages.jobmanager.RMSlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
+import 
org.apache.flink.runtime.resourcemanager.registration.TaskExecutorRegistration;
+import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskexecutor.SlotReport;
 import org.apache.flink.runtime.taskexecutor.SlotStatus;
@@ -94,9 +102,9 @@ public class SlotProtocolTest extends TestLogger {
                TestingLeaderElectionService rmLeaderElectionService =
                        configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
-               SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
-               ResourceManager resourceManager =
-                       Mockito.spy(new 
StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
+               final TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               SpiedResourceManager resourceManager =
+                       new SpiedResourceManager(testRpcService, 
testingHaServices, slotManagerFactory);
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -108,11 +116,13 @@ public class SlotProtocolTest extends TestLogger {
                        Assert.fail("JobManager registration Future didn't 
become ready.");
                }
 
+               final SlotManager slotManager = slotManagerFactory.slotManager;
+
                final AllocationID allocationID = new AllocationID();
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
 
                SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
-               SlotRequestReply slotRequestReply =
+               RMSlotRequestReply slotRequestReply =
                        resourceManager.requestSlot(jmLeaderID, rmLeaderID, 
slotRequest);
 
                // 1) SlotRequest is routed to the SlotManager
@@ -124,15 +134,18 @@ public class SlotProtocolTest extends TestLogger {
                        allocationID);
 
                // 3) SlotRequest leads to a container allocation
-               verify(resourceManager, 
timeout(5000)).startNewWorker(resourceProfile);
+               Assert.assertEquals(1, resourceManager.startNewWorkerCalled);
 
                Assert.assertFalse(slotManager.isAllocated(allocationID));
 
                // slot becomes available
                final String tmAddress = "/tm1";
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               
Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), 
any(UUID.class), any(Time.class)))
-                       .thenReturn(new 
FlinkCompletableFuture<SlotRequestReply>());
+               Mockito
+                       .when(
+                               taskExecutorGateway
+                                       .requestSlot(any(SlotID.class), 
any(AllocationID.class), any(UUID.class), any(Time.class)))
+                       .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
                final ResourceID resourceID = ResourceID.generate();
@@ -141,13 +154,14 @@ public class SlotProtocolTest extends TestLogger {
                final SlotStatus slotStatus =
                        new SlotStatus(slotID, resourceProfile);
                final SlotReport slotReport =
-                       new SlotReport(Collections.singletonList(slotStatus), 
resourceID);
+                       new SlotReport(Collections.singletonList(slotStatus));
                // register slot at SlotManager
-               slotManager.registerTaskExecutor(resourceID, 
taskExecutorGateway);
-               slotManager.updateSlotStatus(slotReport);
+               slotManager.registerTaskExecutor(
+                       resourceID, new 
TaskExecutorRegistration(taskExecutorGateway), slotReport);
 
                // 4) Slot becomes available and TaskExecutor gets a SlotRequest
-               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
+               verify(taskExecutorGateway, timeout(5000))
+                       .requestSlot(eq(slotID), eq(allocationID), 
any(UUID.class), any(Time.class));
        }
 
        /**
@@ -173,13 +187,15 @@ public class SlotProtocolTest extends TestLogger {
                        configureHA(testingHaServices, jobID, rmAddress, 
rmLeaderID, jmAddress, jmLeaderID);
 
                TaskExecutorGateway taskExecutorGateway = 
mock(TaskExecutorGateway.class);
-               
Mockito.when(taskExecutorGateway.requestSlot(any(AllocationID.class), 
any(UUID.class), any(Time.class)))
-                       .thenReturn(new 
FlinkCompletableFuture<SlotRequestReply>());
+               Mockito.when(
+                       taskExecutorGateway
+                               .requestSlot(any(SlotID.class), 
any(AllocationID.class), any(UUID.class), any(Time.class)))
+                       .thenReturn(new 
FlinkCompletableFuture<TMSlotRequestReply>());
                testRpcService.registerGateway(tmAddress, taskExecutorGateway);
 
-               SlotManager slotManager = Mockito.spy(new SimpleSlotManager());
-               ResourceManager resourceManager =
-                       Mockito.spy(new 
StandaloneResourceManager(testRpcService, testingHaServices, slotManager));
+               TestingSlotManagerFactory slotManagerFactory = new 
TestingSlotManagerFactory();
+               TestingResourceManager resourceManager =
+                       Mockito.spy(new TestingResourceManager(testRpcService, 
testingHaServices, slotManagerFactory));
                resourceManager.start();
                rmLeaderElectionService.isLeader(rmLeaderID);
 
@@ -191,6 +207,8 @@ public class SlotProtocolTest extends TestLogger {
                        Assert.fail("JobManager registration Future didn't 
become ready.");
                }
 
+               final SlotManager slotManager = slotManagerFactory.slotManager;
+
                final ResourceID resourceID = ResourceID.generate();
                final AllocationID allocationID = new AllocationID();
                final ResourceProfile resourceProfile = new 
ResourceProfile(1.0, 100);
@@ -199,13 +217,13 @@ public class SlotProtocolTest extends TestLogger {
                final SlotStatus slotStatus =
                        new SlotStatus(slotID, resourceProfile);
                final SlotReport slotReport =
-                       new SlotReport(Collections.singletonList(slotStatus), 
resourceID);
+                       new SlotReport(Collections.singletonList(slotStatus));
                // register slot at SlotManager
-               slotManager.registerTaskExecutor(resourceID, 
taskExecutorGateway);
-               slotManager.updateSlotStatus(slotReport);
+               slotManager.registerTaskExecutor(
+                       resourceID, new 
TaskExecutorRegistration(taskExecutorGateway), slotReport);
 
                SlotRequest slotRequest = new SlotRequest(jobID, allocationID, 
resourceProfile);
-               SlotRequestReply slotRequestReply =
+               RMSlotRequestReply slotRequestReply =
                        resourceManager.requestSlot(jmLeaderID, rmLeaderID, 
slotRequest);
 
                // 1) a SlotRequest is routed to the SlotManager
@@ -220,9 +238,9 @@ public class SlotProtocolTest extends TestLogger {
                Assert.assertTrue(slotManager.isAllocated(slotID));
                Assert.assertTrue(slotManager.isAllocated(allocationID));
 
-
                // 4) a SlotRequest is routed to the TaskExecutor
-               verify(taskExecutorGateway, 
timeout(5000)).requestSlot(eq(allocationID), any(UUID.class), any(Time.class));
+               verify(taskExecutorGateway, timeout(5000))
+                       .requestSlot(eq(slotID), eq(allocationID), 
any(UUID.class), any(Time.class));
        }
 
        private static TestingLeaderElectionService configureHA(
@@ -240,4 +258,32 @@ public class SlotProtocolTest extends TestLogger {
                return rmLeaderElectionService;
        }
 
+       private static class SpiedResourceManager extends 
TestingResourceManager {
+
+               private int startNewWorkerCalled = 0;
+
+               public SpiedResourceManager(
+                               RpcService rpcService,
+                               HighAvailabilityServices 
highAvailabilityServices,
+                               SlotManagerFactory slotManagerFactory) {
+                       super(rpcService, highAvailabilityServices, 
slotManagerFactory);
+               }
+
+
+               @Override
+               public void startNewWorker(ResourceProfile resourceProfile) {
+                       startNewWorkerCalled++;
+               }
+       }
+
+       private static class TestingSlotManagerFactory implements 
SlotManagerFactory {
+
+               private SlotManager slotManager;
+
+               @Override
+               public SlotManager create(ResourceManagerServices rmServices) {
+                       this.slotManager = Mockito.spy(new 
TestingSlotManager(rmServices));
+                       return this.slotManager;
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5915613d/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index 9c1f288..7710fa9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.runtime.taskexecutor;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.highavailability.NonHaServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -28,11 +30,15 @@ import 
org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.MetricRegistry;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRegistered;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestRejected;
+import 
org.apache.flink.runtime.resourcemanager.messages.taskexecutor.TMSlotRequestReply;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
 import org.apache.flink.runtime.rpc.TestingSerialRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.Test;
 
 import org.powermock.api.mockito.PowerMockito;
@@ -76,7 +82,7 @@ public class TaskExecutorTest extends TestLogger {
                        String taskManagerAddress = taskManager.getAddress();
 
                        verify(rmGateway).registerTaskExecutor(
-                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(Time.class));
+                                       any(UUID.class), 
eq(taskManagerAddress), eq(resourceID), any(SlotReport.class), any(Time.class));
                }
                finally {
                        rpc.stopService();
@@ -132,7 +138,7 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address1, leaderId1);
 
                        verify(rmGateway1).registerTaskExecutor(
-                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(Time.class));
+                                       eq(leaderId1), eq(taskManagerAddress), 
eq(resourceID), any(SlotReport.class), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
 
                        // cancel the leader 
@@ -142,11 +148,95 @@ public class TaskExecutorTest extends TestLogger {
                        testLeaderService.notifyListener(address2, leaderId2);
 
                        verify(rmGateway2).registerTaskExecutor(
-                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(Time.class));
+                                       eq(leaderId2), eq(taskManagerAddress), 
eq(resourceID), any(SlotReport.class), any(Time.class));
                        
assertNotNull(taskManager.getResourceManagerConnection());
                }
                finally {
                        rpc.stopService();
                }
        }
+
+       /**
+        * Tests that all allocation requests for slots are ignored if the slot 
has been reported as
+        * free by the TaskExecutor but this report hasn't been confirmed by 
the ResourceManager.
+        *
+        * This is essential for the correctness of the state of the 
ResourceManager.
+        */
+       @Test
+       public void testRejectAllocationRequestsForOutOfSyncSlots() {
+               final ResourceID resourceID = ResourceID.generate();
+
+               final String address1 = "/resource/manager/address/one";
+               final UUID leaderId = UUID.randomUUID();
+
+               final TestingSerialRpcService rpc = new 
TestingSerialRpcService();
+               try {
+                       // register the mock resource manager gateways
+                       ResourceManagerGateway rmGateway1 = 
mock(ResourceManagerGateway.class);
+                       rpc.registerGateway(address1, rmGateway1);
+
+                       TestingLeaderRetrievalService testLeaderService = new 
TestingLeaderRetrievalService();
+
+                       TestingHighAvailabilityServices haServices = new 
TestingHighAvailabilityServices();
+                       
haServices.setResourceManagerLeaderRetriever(testLeaderService);
+
+                       TaskManagerConfiguration 
taskManagerServicesConfiguration = mock(TaskManagerConfiguration.class);
+                       
PowerMockito.when(taskManagerServicesConfiguration.getNumberSlots()).thenReturn(1);
+
+                       TaskManagerLocation taskManagerLocation = 
mock(TaskManagerLocation.class);
+                       
when(taskManagerLocation.getResourceID()).thenReturn(resourceID);
+
+                       TaskExecutor taskManager = new TaskExecutor(
+                               taskManagerServicesConfiguration,
+                               taskManagerLocation,
+                               rpc,
+                               mock(MemoryManager.class),
+                               mock(IOManager.class),
+                               mock(NetworkEnvironment.class),
+                               haServices,
+                               mock(MetricRegistry.class),
+                               mock(FatalErrorHandler.class));
+
+                       taskManager.start();
+                       String taskManagerAddress = taskManager.getAddress();
+
+                       // no connection initially, since there is no leader
+                       assertNull(taskManager.getResourceManagerConnection());
+
+                       // define a leader and see that a registration happens
+                       testLeaderService.notifyListener(address1, leaderId);
+
+                       verify(rmGateway1).registerTaskExecutor(
+                               eq(leaderId), eq(taskManagerAddress), 
eq(resourceID), any(SlotReport.class), any(Time.class));
+                       
assertNotNull(taskManager.getResourceManagerConnection());
+
+                       // test that allocating a slot works
+                       final SlotID slotID = new SlotID(resourceID, 0);
+                       TMSlotRequestReply tmSlotRequestReply = 
taskManager.requestSlot(slotID, new AllocationID(), leaderId);
+                       assertTrue(tmSlotRequestReply instanceof 
TMSlotRequestRegistered);
+
+                       // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
+                       final SlotID unconfirmedFreeSlotID = new 
SlotID(resourceID, 1);
+                       
taskManager.addUnconfirmedFreeSlotNotification(unconfirmedFreeSlotID);
+                       TMSlotRequestReply tmSlotRequestReply2 =
+                               taskManager.requestSlot(unconfirmedFreeSlotID, 
new AllocationID(), leaderId);
+                       assertTrue(tmSlotRequestReply2 instanceof 
TMSlotRequestRejected);
+
+                       // re-register
+                       verify(rmGateway1).registerTaskExecutor(
+                               eq(leaderId), eq(taskManagerAddress), 
eq(resourceID), any(SlotReport.class), any(Time.class));
+                       testLeaderService.notifyListener(address1, leaderId);
+
+                       // now we should be successful because the slots status 
has been synced
+                       // test that we can't allocate slots which are 
blacklisted due to pending confirmation of the RM
+                       TMSlotRequestReply tmSlotRequestReply3 =
+                               taskManager.requestSlot(unconfirmedFreeSlotID, 
new AllocationID(), leaderId);
+                       assertTrue(tmSlotRequestReply3 instanceof 
TMSlotRequestRegistered);
+
+               }
+               finally {
+                       rpc.stopService();
+               }
+
+       }
 }

Reply via email to