[FLINK-8653] [flip6] Remove internal slot request timeout from SlotPool

Instead of using the internal slot request timeout to time out pending slot 
requests,
we use the timeout passed to SlotPool#allocateSlot to time out pending slot 
requests.

This closes #5483.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a88669
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a88669
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a88669

Branch: refs/heads/master
Commit: d8a8866973f7e0463047963e6b242cdc2cb82fec
Parents: d9d89ff
Author: Till Rohrmann <[email protected]>
Authored: Wed Feb 14 12:09:01 2018 +0100
Committer: Till Rohrmann <[email protected]>
Committed: Sun Feb 18 10:12:54 2018 +0100

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  1 -
 .../runtime/jobmaster/slotpool/SlotPool.java    | 89 ++++++++++----------
 .../jobmaster/slotpool/SlotPoolRpcTest.java     | 72 ++++++----------
 .../jobmaster/slotpool/SlotPoolTest.java        | 34 +-------
 4 files changed, 74 insertions(+), 122 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 139c053..dfa4d1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -276,7 +276,6 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        jobGraph.getJobID(),
                        SystemClock.getInstance(),
                        rpcTimeout,
-                       jobMasterConfiguration.getSlotRequestTimeout(),
                        jobMasterConfiguration.getSlotIdleTimeout());
 
                this.slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
index 72bb7e1..6d714e1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java
@@ -67,6 +67,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
@@ -111,9 +112,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
        /** Timeout for external request calls (e.g. to the ResourceManager or 
the TaskExecutor). */
        private final Time rpcTimeout;
 
-       /** Timeout for allocation round trips (RM -> launch TM -> offer slot). 
*/
-       private final Time slotRequestTimeout;
-
        /** Timeout for releasing idle slots. */
        private final Time idleSlotTimeout;
 
@@ -139,7 +137,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        jobId,
                        SystemClock.getInstance(),
                        AkkaUtils.getDefaultTimeout(),
-                       
Time.milliseconds(JobManagerOptions.SLOT_REQUEST_TIMEOUT.defaultValue()),
                        
Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue()));
        }
 
@@ -148,7 +145,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        JobID jobId,
                        Clock clock,
                        Time rpcTimeout,
-                       Time slotRequestTimeout,
                        Time idleSlotTimeout) {
 
                super(rpcService);
@@ -157,7 +153,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                this.clock = checkNotNull(clock);
                this.rpcTimeout = checkNotNull(rpcTimeout);
                this.idleSlotTimeout = checkNotNull(idleSlotTimeout);
-               this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
 
                this.registeredTaskManagers = new HashSet<>(16);
                this.allocatedSlots = new AllocatedSlots();
@@ -307,7 +302,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        scheduledUnit,
                        resourceProfile,
                        locationPreferences,
-                       allowQueuedScheduling);
+                       allowQueuedScheduling,
+                       timeout);
        }
 
        private CompletableFuture<LogicalSlot> internalAllocateSlot(
@@ -315,7 +311,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        ScheduledUnit task,
                        ResourceProfile resourceProfile,
                        Collection<TaskManagerLocation> locationPreferences,
-                       boolean allowQueuedScheduling) {
+                       boolean allowQueuedScheduling,
+                       Time allocationTimeout) {
 
                final SlotSharingGroupId slotSharingGroupId = 
task.getSlotSharingGroupId();
 
@@ -337,13 +334,15 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                                multiTaskSlotManager,
                                                resourceProfile,
                                                locationPreferences,
-                                               allowQueuedScheduling);
+                                               allowQueuedScheduling,
+                                               allocationTimeout);
                                } else {
                                        multiTaskSlotLocality = 
allocateMultiTaskSlot(
                                                task.getJobVertexId(), 
multiTaskSlotManager,
                                                resourceProfile,
                                                locationPreferences,
-                                               allowQueuedScheduling);
+                                               allowQueuedScheduling,
+                                               allocationTimeout);
                                }
                        } catch (NoResourceAvailableException 
noResourceException) {
                                return 
FutureUtils.completedExceptionally(noResourceException);
@@ -364,7 +363,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                slotRequestId,
                                resourceProfile,
                                locationPreferences,
-                               allowQueuedScheduling);
+                               allowQueuedScheduling,
+                               allocationTimeout);
 
                        return slotAndLocalityFuture.thenApply(
                                (SlotAndLocality slotAndLocality) -> {
@@ -399,6 +399,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         * @param resourceProfile specifying the requirements for the requested 
slot
         * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
         * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
+        * @param allocationTimeout timeout before the slot allocation times out
         * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated{@link SlotSharingManager.MultiTaskSlot}
         *              and its locality wrt the given location preferences
         * @throws NoResourceAvailableException if no task slot could be 
allocated
@@ -408,7 +409,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        SlotSharingManager multiTaskSlotManager,
                        ResourceProfile resourceProfile,
                        Collection<TaskManagerLocation> locationPreferences,
-                       boolean allowQueuedScheduling) throws 
NoResourceAvailableException {
+                       boolean allowQueuedScheduling,
+                       Time allocationTimeout) throws 
NoResourceAvailableException {
                final SlotRequestId coLocationSlotRequestId = 
coLocationConstraint.getSlotRequestId();
 
                if (coLocationSlotRequestId != null) {
@@ -437,7 +439,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        coLocationConstraint.getGroupId(), multiTaskSlotManager,
                        resourceProfile,
                        actualLocationPreferences,
-                       allowQueuedScheduling);
+                       allowQueuedScheduling,
+                       allocationTimeout);
 
                // check whether we fulfill the co-location constraint
                if (coLocationConstraint.isAssigned() && 
multiTaskSlotLocality.getLocality() != Locality.LOCAL) {
@@ -493,6 +496,7 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         * @param resourceProfile specifying the requirements for the requested 
slot
         * @param locationPreferences containing preferred TaskExecutors on 
which to allocate the slot
         * @param allowQueuedScheduling true if queued scheduling (the returned 
task slot must not be completed yet) is allowed, otherwise false
+        * @param allocationTimeout timeout before the slot allocation times out
         * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which 
contains the allocated {@link SlotSharingManager.MultiTaskSlot}
         *              and its locality wrt the given location preferences
         * @throws NoResourceAvailableException if no task slot could be 
allocated
@@ -502,7 +506,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        SlotSharingManager slotSharingManager,
                        ResourceProfile resourceProfile,
                        Collection<TaskManagerLocation> locationPreferences,
-                       boolean allowQueuedScheduling) throws 
NoResourceAvailableException {
+                       boolean allowQueuedScheduling,
+                       Time allocationTimeout) throws 
NoResourceAvailableException {
 
                // check first whether we have a resolved root slot which we 
can use
                SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality 
= slotSharingManager.getResolvedRootSlot(
@@ -552,7 +557,10 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
 
                        if (multiTaskSlotFuture == null) {
                                // it seems as if we have to request a new slot 
from the resource manager, this is always the last resort!!!
-                               final CompletableFuture<AllocatedSlot> 
futureSlot = requestNewAllocatedSlot(allocatedSlotRequestId, resourceProfile);
+                               final CompletableFuture<AllocatedSlot> 
futureSlot = requestNewAllocatedSlot(
+                                       allocatedSlotRequestId,
+                                       resourceProfile,
+                                       allocationTimeout);
 
                                multiTaskSlotFuture = 
slotSharingManager.createRootSlot(
                                        multiTaskSlotRequestId,
@@ -597,13 +605,15 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         * @param resourceProfile which the allocated slot should fulfill
         * @param locationPreferences for the allocated slot
         * @param allowQueuedScheduling true if the slot allocation can be 
completed in the future
+        * @param allocationTimeout timeout before the slot allocation times out
         * @return Future containing the allocated simple slot
         */
        private CompletableFuture<SlotAndLocality> requestAllocatedSlot(
                        SlotRequestId slotRequestId,
                        ResourceProfile resourceProfile,
                        Collection<TaskManagerLocation> locationPreferences,
-                       boolean allowQueuedScheduling) {
+                       boolean allowQueuedScheduling,
+                       Time allocationTimeout) {
 
                final CompletableFuture<SlotAndLocality> 
allocatedSlotLocalityFuture;
 
@@ -616,7 +626,8 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        // we have to request a new allocated slot
                        CompletableFuture<AllocatedSlot> allocatedSlotFuture = 
requestNewAllocatedSlot(
                                slotRequestId,
-                               resourceProfile);
+                               resourceProfile,
+                               allocationTimeout);
 
                        allocatedSlotLocalityFuture = 
allocatedSlotFuture.thenApply((AllocatedSlot allocatedSlot) -> new 
SlotAndLocality(allocatedSlot, Locality.UNKNOWN));
                } else {
@@ -634,16 +645,29 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
         *
         * @param slotRequestId identifying the requested slot
         * @param resourceProfile which the requested slot should fulfill
+        * @param allocationTimeout timeout before the slot allocation times out
         * @return An {@link AllocatedSlot} future which is completed once the 
slot is offered to the {@link SlotPool}
         */
        private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot(
-               SlotRequestId slotRequestId,
-               ResourceProfile resourceProfile) {
+                       SlotRequestId slotRequestId,
+                       ResourceProfile resourceProfile,
+                       Time allocationTimeout) {
 
                final PendingRequest pendingRequest = new PendingRequest(
                        slotRequestId,
                        resourceProfile);
 
+               // register request timeout
+               FutureUtils
+                       .orTimeout(pendingRequest.getAllocatedSlotFuture(), 
allocationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS)
+                       .whenCompleteAsync(
+                               (AllocatedSlot ignored, Throwable throwable) -> 
{
+                                       if (throwable != null) {
+                                               
removePendingRequest(slotRequestId);
+                                       }
+                               },
+                               getMainThreadExecutor());
+
                if (resourceManagerGateway == null) {
                        stashRequestWaitingForResourceManager(pendingRequest);
                } else {
@@ -678,15 +702,9 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        new SlotRequest(jobId, allocationId, 
pendingRequest.getResourceProfile(), jobManagerAddress),
                        rpcTimeout);
 
-               CompletableFuture<Void> slotRequestProcessingFuture = 
rmResponse.thenAcceptAsync(
-                       (Acknowledge value) -> {
-                               
slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId());
-                       },
-                       getMainThreadExecutor());
-
                // on failure, fail the request future
-               slotRequestProcessingFuture.whenCompleteAsync(
-                       (Void v, Throwable failure) -> {
+               rmResponse.whenCompleteAsync(
+                       (Acknowledge ignored, Throwable failure) -> {
                                if (failure != null) {
                                        
slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure);
                                }
@@ -694,12 +712,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                        getMainThreadExecutor());
        }
 
-       private void slotRequestToResourceManagerSuccess(final SlotRequestId 
requestId) {
-               // a request is pending from the ResourceManager to a (future) 
TaskManager
-               // we only add the watcher here in case that request times out
-               scheduleRunAsync(() -> checkTimeoutSlotAllocation(requestId), 
slotRequestTimeout);
-       }
-
        private void slotRequestToResourceManagerFailed(SlotRequestId 
slotRequestID, Throwable failure) {
                PendingRequest request = 
pendingRequests.removeKeyA(slotRequestID);
                if (request != null) {
@@ -727,17 +739,6 @@ public class SlotPool extends RpcEndpoint implements 
SlotPoolGateway, AllocatedS
                                "Adding as pending request {}",  
pendingRequest.getSlotRequestId());
 
                
waitingForResourceManager.put(pendingRequest.getSlotRequestId(), 
pendingRequest);
-
-               scheduleRunAsync(() -> 
checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId()),
 slotRequestTimeout);
-       }
-
-       private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId 
slotRequestId) {
-               PendingRequest request = 
waitingForResourceManager.remove(slotRequestId);
-               if (request != null) {
-                       failPendingRequest(
-                               request,
-                               new TimeoutException("No slot available and no 
connection to Resource Manager established."));
-               }
        }
 
        // 
------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
index bf8037d..a9be9cf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
@@ -50,7 +51,6 @@ import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorSystem;
-import akka.pattern.AskTimeoutException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -66,7 +66,6 @@ import java.util.function.Consumer;
 
 import static 
org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -80,6 +79,8 @@ public class SlotPoolRpcTest extends TestLogger {
 
        private static final Time timeout = Time.seconds(10L);
 
+       private static final Time fastTimeout = Time.milliseconds(1L);
+
        // 
------------------------------------------------------------------------
        //  setup
        // 
------------------------------------------------------------------------
@@ -111,7 +112,6 @@ public class SlotPoolRpcTest extends TestLogger {
                        jid,
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
-                       Time.milliseconds(10L), // this is the timeout for the 
request tested here
                        TestingUtils.infiniteTime()
                );
 
@@ -124,7 +124,7 @@ public class SlotPoolRpcTest extends TestLogger {
                                DEFAULT_TESTING_PROFILE,
                                Collections.emptyList(),
                                true,
-                               TestingUtils.infiniteTime());
+                               fastTimeout);
 
                        try {
                                future.get();
@@ -146,7 +146,6 @@ public class SlotPoolRpcTest extends TestLogger {
                        jid,
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                try {
@@ -160,27 +159,26 @@ public class SlotPoolRpcTest extends TestLogger {
                                DEFAULT_TESTING_PROFILE,
                                Collections.emptyList(),
                                true,
-                               Time.milliseconds(10L));
+                               fastTimeout);
 
                        try {
                                future.get();
-                               fail("We expected a AskTimeoutException.");
+                               fail("We expected a TimeoutException.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
AskTimeoutException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
                        }
 
-                       assertEquals(1L, (long) 
pool.getNumberOfWaitingForResourceRequests().get());
-
-                       slotPoolGateway.releaseSlot(requestId, null, 
null).get();
-
                        assertEquals(0L, (long) 
pool.getNumberOfWaitingForResourceRequests().get());
                } finally {
                        RpcUtils.terminateRpcEndpoint(pool, timeout);
                }
        }
 
+       /**
+        * Tests that a slot allocation times out wrt to the specified time out.
+        */
        @Test
-       public void testCancelSlotAllocationWithResourceManager() throws 
Exception {
+       public void testSlotAllocationTimeout() throws Exception {
                final JobID jid = new JobID();
 
                final TestingSlotPool pool = new TestingSlotPool(
@@ -188,7 +186,6 @@ public class SlotPoolRpcTest extends TestLogger {
                        jid,
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                try {
@@ -201,22 +198,19 @@ public class SlotPoolRpcTest extends TestLogger {
                        SlotRequestId requestId = new SlotRequestId();
                        CompletableFuture<LogicalSlot> future = 
slotPoolGateway.allocateSlot(
                                requestId,
-                               new 
ScheduledUnit(SchedulerTestUtils.getDummyTask()),
+                               new DummyScheduledUnit(),
                                DEFAULT_TESTING_PROFILE,
                                Collections.emptyList(),
                                true,
-                               Time.milliseconds(10L));
+                               fastTimeout);
 
                        try {
                                future.get();
-                               fail("We expected a AskTimeoutException.");
+                               fail("We expected a TimeoutException.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
AskTimeoutException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
                        }
 
-                       assertEquals(1L, (long) 
pool.getNumberOfPendingRequests().get());
-
-                       slotPoolGateway.releaseSlot(requestId, null, 
null).get();
                        assertEquals(0L, (long) 
pool.getNumberOfPendingRequests().get());
                } finally {
                        RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -224,10 +218,10 @@ public class SlotPoolRpcTest extends TestLogger {
        }
 
        /**
-        * Tests that allocated slots are not cancelled.
+        * Tests that extra slots are kept by the {@link SlotPool}.
         */
        @Test
-       public void testCancelSlotAllocationWhileSlotFulfilled() throws 
Exception {
+       public void testExtraSlotsAreKept() throws Exception {
                final JobID jid = new JobID();
 
                final TestingSlotPool pool = new TestingSlotPool(
@@ -235,7 +229,6 @@ public class SlotPoolRpcTest extends TestLogger {
                        jid,
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                try {
@@ -257,15 +250,17 @@ public class SlotPoolRpcTest extends TestLogger {
                                DEFAULT_TESTING_PROFILE,
                                Collections.emptyList(),
                                true,
-                               Time.milliseconds(10L));
+                               fastTimeout);
 
                        try {
                                future.get();
-                               fail("We expected a AskTimeoutException.");
+                               fail("We expected a TimeoutException.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
AskTimeoutException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
                        }
 
+                       assertEquals(0L, (long) 
pool.getNumberOfPendingRequests().get());
+
                        AllocationID allocationId = allocationIdFuture.get();
                        final SlotOffer slotOffer = new SlotOffer(
                                allocationId,
@@ -278,13 +273,6 @@ public class SlotPoolRpcTest extends TestLogger {
 
                        
assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, 
slotOffer).get());
 
-                       assertEquals(0L, (long) 
pool.getNumberOfPendingRequests().get());
-
-                       
assertTrue(pool.containsAllocatedSlot(allocationId).get());
-
-                       pool.releaseSlot(requestId, null, null).get();
-
-                       
assertFalse(pool.containsAllocatedSlot(allocationId).get());
                        
assertTrue(pool.containsAvailableSlot(allocationId).get());
                } finally {
                        RpcUtils.terminateRpcEndpoint(pool, timeout);
@@ -296,7 +284,7 @@ public class SlotPoolRpcTest extends TestLogger {
         * it will automatically call cancelSlotAllocation as will inject 
future.whenComplete in ProviderAndOwner.
         */
        @Test
-       public void testProviderAndOwner() throws Exception {
+       public void testProviderAndOwnerSlotAllocationTimeout() throws 
Exception {
                final JobID jid = new JobID();
 
                final TestingSlotPool pool = new TestingSlotPool(
@@ -304,7 +292,6 @@ public class SlotPoolRpcTest extends TestLogger {
                        jid,
                        SystemClock.getInstance(),
                        TestingUtils.infiniteTime(),
-                       TestingUtils.infiniteTime(),
                        TestingUtils.infiniteTime());
 
                final CompletableFuture<SlotRequestId> releaseSlotFuture = new 
CompletableFuture<>();
@@ -317,20 +304,18 @@ public class SlotPoolRpcTest extends TestLogger {
                        ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
                        pool.connectToResourceManager(resourceManagerGateway);
 
-                       ScheduledUnit mockScheduledUnit = new 
ScheduledUnit(SchedulerTestUtils.getDummyTask());
-
                        // test the pending request is clear when timed out
                        CompletableFuture<LogicalSlot> future = 
pool.getSlotProvider().allocateSlot(
-                               mockScheduledUnit,
+                               new DummyScheduledUnit(),
                                true,
                                Collections.emptyList(),
-                               Time.milliseconds(1L));
+                               fastTimeout);
 
                        try {
                                future.get();
-                               fail("We expected a AskTimeoutException.");
+                               fail("We expected a TimeoutException.");
                        } catch (ExecutionException e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
AskTimeoutException);
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
TimeoutException);
                        }
 
                        // wait for the cancel call on the SlotPool
@@ -353,14 +338,13 @@ public class SlotPoolRpcTest extends TestLogger {
                                RpcService rpcService,
                                JobID jobId,
                                Clock clock,
-                               Time slotRequestTimeout,
                                Time rpcTimeout,
                                Time idleSlotTimeout) {
                        super(
                                rpcService,
                                jobId,
                                clock,
-                               rpcTimeout, slotRequestTimeout,
+                               rpcTimeout,
                                idleSlotTimeout);
 
                        releaseSlotConsumer = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
index 6a85461..e6446ad 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java
@@ -66,7 +66,6 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 
 import static 
org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -587,7 +586,7 @@ public class SlotPoolTest extends TestLogger {
                        rpcService,
                        jobId,
                        clock,
-                       TestingUtils.infiniteTime(), 
TestingUtils.infiniteTime(),
+                       TestingUtils.infiniteTime(),
                        timeout);
 
                try {
@@ -628,37 +627,6 @@ public class SlotPoolTest extends TestLogger {
                }
        }
 
-       /**
-        * Tests that a slot allocation times out wrt to the specified time out.
-        */
-       @Test
-       public void testSlotAllocationTimeout() throws Exception {
-               final SlotPool slotPool = new SlotPool(rpcService, jobId);
-
-               final Time allocationTimeout = Time.milliseconds(1L);
-
-               try {
-                       setupSlotPool(slotPool, resourceManagerGateway);
-
-                       final SlotProvider slotProvider = 
slotPool.getSlotProvider();
-
-                       final CompletableFuture<LogicalSlot> allocationFuture = 
slotProvider.allocateSlot(
-                               new DummyScheduledUnit(),
-                               true,
-                               Collections.emptyList(),
-                               allocationTimeout);
-
-                       try {
-                               allocationFuture.get();
-                               fail("Should have failed with a timeout 
exception.");
-                       } catch (ExecutionException ee) {
-                               
assertThat(ExceptionUtils.stripExecutionException(ee), 
Matchers.instanceOf(TimeoutException.class));
-                       }
-               } finally {
-                       RpcUtils.terminateRpcEndpoint(slotPool, timeout);
-               }
-       }
-
        private static SlotPoolGateway setupSlotPool(
                        SlotPool slotPool,
                        ResourceManagerGateway resourceManagerGateway) throws 
Exception {

Reply via email to