Repository: flink Updated Branches: refs/heads/master 775d7fed1 -> 56aefcd9e
[FLINK-6434] cancel slot allocation if request timed out in ProviderAndOwner Summary: This fix flink jira #6434 1. Let the ProviderAndOwner generate the allcation id before calling allocateSlot to slot pool. 2. If the allocateSlot call timed out, ProviderAndOwner cancel the previos allocation to slot pool. Test Plan: UnitTest Reviewers: haitao.w Differential Revision: https://aone.alibaba-inc.com/code/D323990 This closes #4937. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f7481977 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f7481977 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f7481977 Branch: refs/heads/master Commit: f7481977cb558c12f43901f9e3bbe4b10f23b0b4 Parents: 775d7fe Author: shuai.xus <shuai....@alibaba-inc.com> Authored: Fri Oct 20 17:12:39 2017 +0800 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Fri Nov 10 11:23:22 2017 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/instance/SlotPool.java | 63 +++++++- .../flink/runtime/instance/SlotPoolGateway.java | 20 +++ .../flink/runtime/instance/SlotPoolRpcTest.java | 147 ++++++++++++++++++- .../flink/runtime/instance/SlotPoolTest.java | 21 +-- 4 files changed, 233 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 12dbc63..b033319 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -54,6 +54,7 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -263,12 +264,13 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { @Override public CompletableFuture<SimpleSlot> allocateSlot( + AllocationID allocationID, ScheduledUnit task, ResourceProfile resources, Iterable<TaskManagerLocation> locationPreferences, Time timeout) { - return internalAllocateSlot(task, resources, locationPreferences); + return internalAllocateSlot(allocationID, task, resources, locationPreferences); } @Override @@ -276,8 +278,28 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { internalReturnAllocatedSlot(slot); } + @Override + public void cancelSlotAllocation(AllocationID allocationID) { + if (waitingForResourceManager.remove(allocationID) == null) { + + PendingRequest request = pendingRequests.remove(allocationID); + if (request != null) { + failPendingRequest(request, new CancellationException("Allocation " + allocationID + " cancelled")); + } else { + + Slot slot = allocatedSlots.get(allocationID); + if (slot != null) { + LOG.info("Return allocated slot {} by cancelling allocation {}.", slot, allocationID); + if (slot.markCancelled()) { + internalReturnAllocatedSlot(slot); + } + } + } + } + } CompletableFuture<SimpleSlot> internalAllocateSlot( + AllocationID allocationID, ScheduledUnit task, ResourceProfile resources, Iterable<TaskManagerLocation> locationPreferences) { @@ -291,7 +313,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { } // the request will be completed by a future - final AllocationID allocationID = new AllocationID(); final CompletableFuture<SimpleSlot> future = new CompletableFuture<>(); // (2) need to request a slot @@ -369,8 +390,14 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { private void checkTimeoutSlotAllocation(AllocationID allocationID) { PendingRequest request = pendingRequests.remove(allocationID); - if (request != null && !request.getFuture().isDone()) { - request.getFuture().completeExceptionally(new TimeoutException("Slot allocation request timed out")); + if (request != null) { + failPendingRequest(request, new TimeoutException("Slot allocation request " + allocationID + " timed out")); + } + } + + private void failPendingRequest(PendingRequest pendingRequest, Exception e) { + if (!pendingRequest.getFuture().isDone()) { + pendingRequest.getFuture().completeExceptionally(e); } } @@ -394,8 +421,8 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) { PendingRequest request = waitingForResourceManager.remove(allocationID); - if (request != null && !request.getFuture().isDone()) { - request.getFuture().completeExceptionally(new NoResourceAvailableException( + if (request != null) { + failPendingRequest(request, new NoResourceAvailableException( "No slot available and no connection to Resource Manager established.")); } } @@ -652,6 +679,20 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { return allocatedSlots; } + @VisibleForTesting + AvailableSlots getAvailableSlots() { + return availableSlots; + } + + public CompletableFuture<Integer> getNumberOfWaitingForResourceRequests() { + return CompletableFuture.completedFuture(waitingForResourceManager.size()); + } + + @Override + public CompletableFuture<Integer> getNumberOfPendingRequests() { + return CompletableFuture.completedFuture(pendingRequests.size()); + } + // ------------------------------------------------------------------------ // Helper classes // ------------------------------------------------------------------------ @@ -1014,7 +1055,15 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { boolean allowQueued, Collection<TaskManagerLocation> preferredLocations) { - return gateway.allocateSlot(task, ResourceProfile.UNKNOWN, preferredLocations, timeout); + final AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> slotFuture = gateway.allocateSlot(allocationID, task, ResourceProfile.UNKNOWN, preferredLocations, timeout); + slotFuture.whenComplete( + (SimpleSlot slot, Throwable failure) -> { + if (failure != null) { + gateway.cancelSlotAllocation(allocationID); + } + }); + return slotFuture; } } http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index 06c4b12..02d5d38 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.instance; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -86,10 +87,29 @@ public interface SlotPoolGateway extends RpcGateway { // ------------------------------------------------------------------------ CompletableFuture<SimpleSlot> allocateSlot( + AllocationID allocationID, ScheduledUnit task, ResourceProfile resources, Iterable<TaskManagerLocation> locationPreferences, @RpcTimeout Time timeout); void returnAllocatedSlot(Slot slot); + + /** + * Cancel a slot allocation. + * This method should be called when the CompletableFuture returned by allocateSlot completed exceptionally. + * + * @param allocationID the unique id for the previous allocation + */ + void cancelSlotAllocation(AllocationID allocationID); + + // ------------------------------------------------------------------------ + // exposing internal statistic, mainly for testing + // ------------------------------------------------------------------------ + + @VisibleForTesting + CompletableFuture<Integer> getNumberOfWaitingForResourceRequests(); + + @VisibleForTesting + CompletableFuture<Integer> getNumberOfPendingRequests(); } http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index 9d742e2..b521b75 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -19,18 +19,24 @@ package org.apache.flink.runtime.instance; import akka.actor.ActorSystem; +import akka.pattern.AskTimeoutException; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; +import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.util.clock.SystemClock; import org.apache.flink.util.TestLogger; - import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -42,6 +48,8 @@ import java.util.concurrent.TimeoutException; import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; @@ -83,7 +91,7 @@ public class SlotPoolRpcTest extends TestLogger { ); pool.start(JobMasterId.generate(), "foobar"); - CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1)); + CompletableFuture<SimpleSlot> future = pool.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1)); try { future.get(4, TimeUnit.SECONDS); @@ -99,4 +107,139 @@ public class SlotPoolRpcTest extends TestLogger { fail("wrong exception: " + e); } } + + @Test + public void testCancelSlotAllocationWithoutResourceManager() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(1) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100)); + + try { + future.get(500, TimeUnit.MILLISECONDS); + fail("We expected a AskTimeoutException."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof AskTimeoutException); + } + + assertEquals(1, slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, slotPoolGateway.getNumberOfWaitingForResourceRequests().get().intValue()); + } + + @Test + public void testCancelSlotAllocationWithResourceManager() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(1) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100)); + + try { + future.get(500, TimeUnit.MILLISECONDS); + fail("We expected a AskTimeoutException."); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof AskTimeoutException); + } + + assertEquals(1, slotPoolGateway.getNumberOfPendingRequests().get().intValue()); + + pool.cancelSlotAllocation(allocationID); + assertEquals(0, slotPoolGateway.getNumberOfPendingRequests().get().intValue()); + } + + @Test + public void testCancelSlotAllocationWhileSlotFulfilled() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.seconds(1) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + SlotPoolGateway slotPoolGateway = pool.getSelfGateway(SlotPoolGateway.class); + + ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + pool.connectToResourceManager(resourceManagerGateway); + + AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.milliseconds(100)); + + try { + future.get(500, TimeUnit.MILLISECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertTrue(e.getCause() instanceof AskTimeoutException); + } + + ResourceID resourceID = ResourceID.generate(); + AllocatedSlot allocatedSlot = SlotPoolTest.createAllocatedSlot(resourceID, allocationID, jid, DEFAULT_TESTING_PROFILE); + slotPoolGateway.registerTaskManager(resourceID); + assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); + + assertEquals(0, slotPoolGateway.getNumberOfPendingRequests().get().intValue()); + assertTrue(pool.getAllocatedSlots().contains(allocationID)); + + pool.cancelSlotAllocation(allocationID); + assertFalse(pool.getAllocatedSlots().contains(allocationID)); + assertTrue(pool.getAvailableSlots().contains(allocationID)); + } + + /** + * This case make sure when allocateSlot in ProviderAndOwner timeout, + * it will automatically call cancelSlotAllocation as will inject future.whenComplete in ProviderAndOwner. + */ + @Test + public void testProviderAndOwner() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.milliseconds(100), Time.days(1), + Time.seconds(1) // this is the timeout for the request tested here + ); + pool.start(JobMasterId.generate(), "foobar"); + ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + pool.connectToResourceManager(resourceManagerGateway); + + ScheduledUnit mockScheduledUnit = new ScheduledUnit(SchedulerTestUtils.getDummyTask()); + + // test the pending request is clear when timed out + CompletableFuture<SimpleSlot> future = pool.getSlotProvider().allocateSlot(mockScheduledUnit, true, null); + + try { + future.get(500, TimeUnit.MILLISECONDS); + fail("We expected a AskTimeoutException."); + } + catch (ExecutionException e) { + assertTrue(e.getCause() instanceof AskTimeoutException); + } + + assertEquals(0, pool.getSelfGateway(SlotPoolGateway.class).getNumberOfPendingRequests().get().intValue()); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f7481977/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index f38894e..bee77e0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -95,8 +95,8 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - ScheduledUnit task = mock(ScheduledUnit.class); - CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(task, DEFAULT_TESTING_PROFILE, null, timeout); + AllocationID allocationID = new AllocationID(); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(allocationID, mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -104,6 +104,8 @@ public class SlotPoolTest extends TestLogger { final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); + assertEquals(allocationID, slotRequest.getAllocationId()); + AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); @@ -129,8 +131,8 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); - CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); assertFalse(future2.isDone()); @@ -176,7 +178,7 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future1.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -193,7 +195,7 @@ public class SlotPoolTest extends TestLogger { // return this slot to pool slot1.releaseSlot(); - CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); // second allocation fulfilled by previous slot returning SimpleSlot slot2 = future2.get(1, TimeUnit.SECONDS); @@ -219,7 +221,7 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); @@ -275,14 +277,14 @@ public class SlotPoolTest extends TestLogger { ResourceID resourceID = new ResourceID("resource"); slotPoolGateway.registerTaskManager(resourceID); - CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); - CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); + CompletableFuture<SimpleSlot> future2 = slotPoolGateway.allocateSlot(new AllocationID(), mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); AllocatedSlot allocatedSlot = createAllocatedSlot(resourceID, slotRequest.getAllocationId(), jobId, DEFAULT_TESTING_PROFILE); assertTrue(slotPoolGateway.offerSlot(allocatedSlot).get()); @@ -333,6 +335,7 @@ public class SlotPoolTest extends TestLogger { slotPoolGateway.connectToResourceManager(resourceManagerGateway); CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot( + new AllocationID(), scheduledUnit, ResourceProfile.UNKNOWN, Collections.emptyList(),