[FLINK-8653] [flip6] Remove internal slot request timeout from SlotPool Instead of using the internal slot request timeout to time out pending slot requests, we use the timeout passed to SlotPool#allocateSlot to time out pending slot requests.
This closes #5483. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d8a88669 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d8a88669 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d8a88669 Branch: refs/heads/master Commit: d8a8866973f7e0463047963e6b242cdc2cb82fec Parents: d9d89ff Author: Till Rohrmann <[email protected]> Authored: Wed Feb 14 12:09:01 2018 +0100 Committer: Till Rohrmann <[email protected]> Committed: Sun Feb 18 10:12:54 2018 +0100 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 1 - .../runtime/jobmaster/slotpool/SlotPool.java | 89 ++++++++++---------- .../jobmaster/slotpool/SlotPoolRpcTest.java | 72 ++++++---------- .../jobmaster/slotpool/SlotPoolTest.java | 34 +------- 4 files changed, 74 insertions(+), 122 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 139c053..dfa4d1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -276,7 +276,6 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast jobGraph.getJobID(), SystemClock.getInstance(), rpcTimeout, - jobMasterConfiguration.getSlotRequestTimeout(), jobMasterConfiguration.getSlotIdleTimeout()); this.slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java index 72bb7e1..6d714e1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPool.java @@ -67,6 +67,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -111,9 +112,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS /** Timeout for external request calls (e.g. to the ResourceManager or the TaskExecutor). */ private final Time rpcTimeout; - /** Timeout for allocation round trips (RM -> launch TM -> offer slot). */ - private final Time slotRequestTimeout; - /** Timeout for releasing idle slots. */ private final Time idleSlotTimeout; @@ -139,7 +137,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS jobId, SystemClock.getInstance(), AkkaUtils.getDefaultTimeout(), - Time.milliseconds(JobManagerOptions.SLOT_REQUEST_TIMEOUT.defaultValue()), Time.milliseconds(JobManagerOptions.SLOT_IDLE_TIMEOUT.defaultValue())); } @@ -148,7 +145,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS JobID jobId, Clock clock, Time rpcTimeout, - Time slotRequestTimeout, Time idleSlotTimeout) { super(rpcService); @@ -157,7 +153,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS this.clock = checkNotNull(clock); this.rpcTimeout = checkNotNull(rpcTimeout); this.idleSlotTimeout = checkNotNull(idleSlotTimeout); - this.slotRequestTimeout = checkNotNull(slotRequestTimeout); this.registeredTaskManagers = new HashSet<>(16); this.allocatedSlots = new AllocatedSlots(); @@ -307,7 +302,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS scheduledUnit, resourceProfile, locationPreferences, - allowQueuedScheduling); + allowQueuedScheduling, + timeout); } private CompletableFuture<LogicalSlot> internalAllocateSlot( @@ -315,7 +311,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS ScheduledUnit task, ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences, - boolean allowQueuedScheduling) { + boolean allowQueuedScheduling, + Time allocationTimeout) { final SlotSharingGroupId slotSharingGroupId = task.getSlotSharingGroupId(); @@ -337,13 +334,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS multiTaskSlotManager, resourceProfile, locationPreferences, - allowQueuedScheduling); + allowQueuedScheduling, + allocationTimeout); } else { multiTaskSlotLocality = allocateMultiTaskSlot( task.getJobVertexId(), multiTaskSlotManager, resourceProfile, locationPreferences, - allowQueuedScheduling); + allowQueuedScheduling, + allocationTimeout); } } catch (NoResourceAvailableException noResourceException) { return FutureUtils.completedExceptionally(noResourceException); @@ -364,7 +363,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS slotRequestId, resourceProfile, locationPreferences, - allowQueuedScheduling); + allowQueuedScheduling, + allocationTimeout); return slotAndLocalityFuture.thenApply( (SlotAndLocality slotAndLocality) -> { @@ -399,6 +399,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * @param resourceProfile specifying the requirements for the requested slot * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false + * @param allocationTimeout timeout before the slot allocation times out * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated{@link SlotSharingManager.MultiTaskSlot} * and its locality wrt the given location preferences * @throws NoResourceAvailableException if no task slot could be allocated @@ -408,7 +409,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS SlotSharingManager multiTaskSlotManager, ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences, - boolean allowQueuedScheduling) throws NoResourceAvailableException { + boolean allowQueuedScheduling, + Time allocationTimeout) throws NoResourceAvailableException { final SlotRequestId coLocationSlotRequestId = coLocationConstraint.getSlotRequestId(); if (coLocationSlotRequestId != null) { @@ -437,7 +439,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS coLocationConstraint.getGroupId(), multiTaskSlotManager, resourceProfile, actualLocationPreferences, - allowQueuedScheduling); + allowQueuedScheduling, + allocationTimeout); // check whether we fulfill the co-location constraint if (coLocationConstraint.isAssigned() && multiTaskSlotLocality.getLocality() != Locality.LOCAL) { @@ -493,6 +496,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * @param resourceProfile specifying the requirements for the requested slot * @param locationPreferences containing preferred TaskExecutors on which to allocate the slot * @param allowQueuedScheduling true if queued scheduling (the returned task slot must not be completed yet) is allowed, otherwise false + * @param allocationTimeout timeout before the slot allocation times out * @return A {@link SlotSharingManager.MultiTaskSlotLocality} which contains the allocated {@link SlotSharingManager.MultiTaskSlot} * and its locality wrt the given location preferences * @throws NoResourceAvailableException if no task slot could be allocated @@ -502,7 +506,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS SlotSharingManager slotSharingManager, ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences, - boolean allowQueuedScheduling) throws NoResourceAvailableException { + boolean allowQueuedScheduling, + Time allocationTimeout) throws NoResourceAvailableException { // check first whether we have a resolved root slot which we can use SlotSharingManager.MultiTaskSlotLocality multiTaskSlotLocality = slotSharingManager.getResolvedRootSlot( @@ -552,7 +557,10 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS if (multiTaskSlotFuture == null) { // it seems as if we have to request a new slot from the resource manager, this is always the last resort!!! - final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot(allocatedSlotRequestId, resourceProfile); + final CompletableFuture<AllocatedSlot> futureSlot = requestNewAllocatedSlot( + allocatedSlotRequestId, + resourceProfile, + allocationTimeout); multiTaskSlotFuture = slotSharingManager.createRootSlot( multiTaskSlotRequestId, @@ -597,13 +605,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * @param resourceProfile which the allocated slot should fulfill * @param locationPreferences for the allocated slot * @param allowQueuedScheduling true if the slot allocation can be completed in the future + * @param allocationTimeout timeout before the slot allocation times out * @return Future containing the allocated simple slot */ private CompletableFuture<SlotAndLocality> requestAllocatedSlot( SlotRequestId slotRequestId, ResourceProfile resourceProfile, Collection<TaskManagerLocation> locationPreferences, - boolean allowQueuedScheduling) { + boolean allowQueuedScheduling, + Time allocationTimeout) { final CompletableFuture<SlotAndLocality> allocatedSlotLocalityFuture; @@ -616,7 +626,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS // we have to request a new allocated slot CompletableFuture<AllocatedSlot> allocatedSlotFuture = requestNewAllocatedSlot( slotRequestId, - resourceProfile); + resourceProfile, + allocationTimeout); allocatedSlotLocalityFuture = allocatedSlotFuture.thenApply((AllocatedSlot allocatedSlot) -> new SlotAndLocality(allocatedSlot, Locality.UNKNOWN)); } else { @@ -634,16 +645,29 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS * * @param slotRequestId identifying the requested slot * @param resourceProfile which the requested slot should fulfill + * @param allocationTimeout timeout before the slot allocation times out * @return An {@link AllocatedSlot} future which is completed once the slot is offered to the {@link SlotPool} */ private CompletableFuture<AllocatedSlot> requestNewAllocatedSlot( - SlotRequestId slotRequestId, - ResourceProfile resourceProfile) { + SlotRequestId slotRequestId, + ResourceProfile resourceProfile, + Time allocationTimeout) { final PendingRequest pendingRequest = new PendingRequest( slotRequestId, resourceProfile); + // register request timeout + FutureUtils + .orTimeout(pendingRequest.getAllocatedSlotFuture(), allocationTimeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .whenCompleteAsync( + (AllocatedSlot ignored, Throwable throwable) -> { + if (throwable != null) { + removePendingRequest(slotRequestId); + } + }, + getMainThreadExecutor()); + if (resourceManagerGateway == null) { stashRequestWaitingForResourceManager(pendingRequest); } else { @@ -678,15 +702,9 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS new SlotRequest(jobId, allocationId, pendingRequest.getResourceProfile(), jobManagerAddress), rpcTimeout); - CompletableFuture<Void> slotRequestProcessingFuture = rmResponse.thenAcceptAsync( - (Acknowledge value) -> { - slotRequestToResourceManagerSuccess(pendingRequest.getSlotRequestId()); - }, - getMainThreadExecutor()); - // on failure, fail the request future - slotRequestProcessingFuture.whenCompleteAsync( - (Void v, Throwable failure) -> { + rmResponse.whenCompleteAsync( + (Acknowledge ignored, Throwable failure) -> { if (failure != null) { slotRequestToResourceManagerFailed(pendingRequest.getSlotRequestId(), failure); } @@ -694,12 +712,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS getMainThreadExecutor()); } - private void slotRequestToResourceManagerSuccess(final SlotRequestId requestId) { - // a request is pending from the ResourceManager to a (future) TaskManager - // we only add the watcher here in case that request times out - scheduleRunAsync(() -> checkTimeoutSlotAllocation(requestId), slotRequestTimeout); - } - private void slotRequestToResourceManagerFailed(SlotRequestId slotRequestID, Throwable failure) { PendingRequest request = pendingRequests.removeKeyA(slotRequestID); if (request != null) { @@ -727,17 +739,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway, AllocatedS "Adding as pending request {}", pendingRequest.getSlotRequestId()); waitingForResourceManager.put(pendingRequest.getSlotRequestId(), pendingRequest); - - scheduleRunAsync(() -> checkTimeoutRequestWaitingForResourceManager(pendingRequest.getSlotRequestId()), slotRequestTimeout); - } - - private void checkTimeoutRequestWaitingForResourceManager(SlotRequestId slotRequestId) { - PendingRequest request = waitingForResourceManager.remove(slotRequestId); - if (request != null) { - failPendingRequest( - request, - new TimeoutException("No slot available and no connection to Resource Manager established.")); - } } // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java index bf8037d..a9be9cf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolRpcTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; @@ -50,7 +51,6 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import akka.actor.ActorSystem; -import akka.pattern.AskTimeoutException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -66,7 +66,6 @@ import java.util.function.Consumer; import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -80,6 +79,8 @@ public class SlotPoolRpcTest extends TestLogger { private static final Time timeout = Time.seconds(10L); + private static final Time fastTimeout = Time.milliseconds(1L); + // ------------------------------------------------------------------------ // setup // ------------------------------------------------------------------------ @@ -111,7 +112,6 @@ public class SlotPoolRpcTest extends TestLogger { jid, SystemClock.getInstance(), TestingUtils.infiniteTime(), - Time.milliseconds(10L), // this is the timeout for the request tested here TestingUtils.infiniteTime() ); @@ -124,7 +124,7 @@ public class SlotPoolRpcTest extends TestLogger { DEFAULT_TESTING_PROFILE, Collections.emptyList(), true, - TestingUtils.infiniteTime()); + fastTimeout); try { future.get(); @@ -146,7 +146,6 @@ public class SlotPoolRpcTest extends TestLogger { jid, SystemClock.getInstance(), TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); try { @@ -160,27 +159,26 @@ public class SlotPoolRpcTest extends TestLogger { DEFAULT_TESTING_PROFILE, Collections.emptyList(), true, - Time.milliseconds(10L)); + fastTimeout); try { future.get(); - fail("We expected a AskTimeoutException."); + fail("We expected a TimeoutException."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException); } - assertEquals(1L, (long) pool.getNumberOfWaitingForResourceRequests().get()); - - slotPoolGateway.releaseSlot(requestId, null, null).get(); - assertEquals(0L, (long) pool.getNumberOfWaitingForResourceRequests().get()); } finally { RpcUtils.terminateRpcEndpoint(pool, timeout); } } + /** + * Tests that a slot allocation times out wrt to the specified time out. + */ @Test - public void testCancelSlotAllocationWithResourceManager() throws Exception { + public void testSlotAllocationTimeout() throws Exception { final JobID jid = new JobID(); final TestingSlotPool pool = new TestingSlotPool( @@ -188,7 +186,6 @@ public class SlotPoolRpcTest extends TestLogger { jid, SystemClock.getInstance(), TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); try { @@ -201,22 +198,19 @@ public class SlotPoolRpcTest extends TestLogger { SlotRequestId requestId = new SlotRequestId(); CompletableFuture<LogicalSlot> future = slotPoolGateway.allocateSlot( requestId, - new ScheduledUnit(SchedulerTestUtils.getDummyTask()), + new DummyScheduledUnit(), DEFAULT_TESTING_PROFILE, Collections.emptyList(), true, - Time.milliseconds(10L)); + fastTimeout); try { future.get(); - fail("We expected a AskTimeoutException."); + fail("We expected a TimeoutException."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException); } - assertEquals(1L, (long) pool.getNumberOfPendingRequests().get()); - - slotPoolGateway.releaseSlot(requestId, null, null).get(); assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); } finally { RpcUtils.terminateRpcEndpoint(pool, timeout); @@ -224,10 +218,10 @@ public class SlotPoolRpcTest extends TestLogger { } /** - * Tests that allocated slots are not cancelled. + * Tests that extra slots are kept by the {@link SlotPool}. */ @Test - public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { + public void testExtraSlotsAreKept() throws Exception { final JobID jid = new JobID(); final TestingSlotPool pool = new TestingSlotPool( @@ -235,7 +229,6 @@ public class SlotPoolRpcTest extends TestLogger { jid, SystemClock.getInstance(), TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); try { @@ -257,15 +250,17 @@ public class SlotPoolRpcTest extends TestLogger { DEFAULT_TESTING_PROFILE, Collections.emptyList(), true, - Time.milliseconds(10L)); + fastTimeout); try { future.get(); - fail("We expected a AskTimeoutException."); + fail("We expected a TimeoutException."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException); } + assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); + AllocationID allocationId = allocationIdFuture.get(); final SlotOffer slotOffer = new SlotOffer( allocationId, @@ -278,13 +273,6 @@ public class SlotPoolRpcTest extends TestLogger { assertTrue(slotPoolGateway.offerSlot(taskManagerLocation, taskManagerGateway, slotOffer).get()); - assertEquals(0L, (long) pool.getNumberOfPendingRequests().get()); - - assertTrue(pool.containsAllocatedSlot(allocationId).get()); - - pool.releaseSlot(requestId, null, null).get(); - - assertFalse(pool.containsAllocatedSlot(allocationId).get()); assertTrue(pool.containsAvailableSlot(allocationId).get()); } finally { RpcUtils.terminateRpcEndpoint(pool, timeout); @@ -296,7 +284,7 @@ public class SlotPoolRpcTest extends TestLogger { * it will automatically call cancelSlotAllocation as will inject future.whenComplete in ProviderAndOwner. */ @Test - public void testProviderAndOwner() throws Exception { + public void testProviderAndOwnerSlotAllocationTimeout() throws Exception { final JobID jid = new JobID(); final TestingSlotPool pool = new TestingSlotPool( @@ -304,7 +292,6 @@ public class SlotPoolRpcTest extends TestLogger { jid, SystemClock.getInstance(), TestingUtils.infiniteTime(), - TestingUtils.infiniteTime(), TestingUtils.infiniteTime()); final CompletableFuture<SlotRequestId> releaseSlotFuture = new CompletableFuture<>(); @@ -317,20 +304,18 @@ public class SlotPoolRpcTest extends TestLogger { ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); pool.connectToResourceManager(resourceManagerGateway); - ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask()); - // test the pending request is clear when timed out CompletableFuture<LogicalSlot> future = pool.getSlotProvider().allocateSlot( - mockScheduledUnit, + new DummyScheduledUnit(), true, Collections.emptyList(), - Time.milliseconds(1L)); + fastTimeout); try { future.get(); - fail("We expected a AskTimeoutException."); + fail("We expected a TimeoutException."); } catch (ExecutionException e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException); } // wait for the cancel call on the SlotPool @@ -353,14 +338,13 @@ public class SlotPoolRpcTest extends TestLogger { RpcService rpcService, JobID jobId, Clock clock, - Time slotRequestTimeout, Time rpcTimeout, Time idleSlotTimeout) { super( rpcService, jobId, clock, - rpcTimeout, slotRequestTimeout, + rpcTimeout, idleSlotTimeout); releaseSlotConsumer = null; http://git-wip-us.apache.org/repos/asf/flink/blob/d8a88669/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java index 6a85461..e6446ad 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolTest.java @@ -66,7 +66,6 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.jobmaster.slotpool.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.hamcrest.MatcherAssert.assertThat; @@ -587,7 +586,7 @@ public class SlotPoolTest extends TestLogger { rpcService, jobId, clock, - TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), + TestingUtils.infiniteTime(), timeout); try { @@ -628,37 +627,6 @@ public class SlotPoolTest extends TestLogger { } } - /** - * Tests that a slot allocation times out wrt to the specified time out. - */ - @Test - public void testSlotAllocationTimeout() throws Exception { - final SlotPool slotPool = new SlotPool(rpcService, jobId); - - final Time allocationTimeout = Time.milliseconds(1L); - - try { - setupSlotPool(slotPool, resourceManagerGateway); - - final SlotProvider slotProvider = slotPool.getSlotProvider(); - - final CompletableFuture<LogicalSlot> allocationFuture = slotProvider.allocateSlot( - new DummyScheduledUnit(), - true, - Collections.emptyList(), - allocationTimeout); - - try { - allocationFuture.get(); - fail("Should have failed with a timeout exception."); - } catch (ExecutionException ee) { - assertThat(ExceptionUtils.stripExecutionException(ee), Matchers.instanceOf(TimeoutException.class)); - } - } finally { - RpcUtils.terminateRpcEndpoint(slotPool, timeout); - } - } - private static SlotPoolGateway setupSlotPool( SlotPool slotPool, ResourceManagerGateway resourceManagerGateway) throws Exception {
