[FLINK-5140] [JobManager] SlotPool accepts allocation requests while ResourceManager is not connected
The requests are kept for a certain time and fulfilled once the ResourceManager is connected. If no ResourceManager is connected in time, the allocation requests are failed. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b3283ec Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b3283ec Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b3283ec Branch: refs/heads/flip-6 Commit: 6b3283ecd980e3db5d5b6cca86885d0dfad6e2cd Parents: 82c1fcf Author: Stephan Ewen <[email protected]> Authored: Fri Dec 2 16:17:11 2016 +0100 Committer: Stephan Ewen <[email protected]> Committed: Mon Dec 5 02:49:43 2016 +0100 ---------------------------------------------------------------------- .../apache/flink/runtime/instance/SlotPool.java | 76 ++++++++++++-- .../flink/runtime/instance/SlotPoolRpcTest.java | 101 +++++++++++++++++++ .../flink/runtime/instance/SlotPoolTest.java | 27 ----- 3 files changed, 166 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index 65a5c45..1a2adfe 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -93,8 +93,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { // ------------------------------------------------------------------------ - private final Object lock = new Object(); - private final JobID jobId; private final ProviderAndOwner providerAndOwner; @@ -111,6 +109,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { /** All pending requests waiting for slots */ private final HashMap<AllocationID, PendingRequest> pendingRequests; + /** The requests that are waiting for the resource manager to be connected */ + private final HashMap<AllocationID, PendingRequest> waitingForResourceManager; + /** Timeout for request calls to the ResourceManager */ private final Time resourceManagerRequestsTimeout; @@ -154,6 +155,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { this.allocatedSlots = new AllocatedSlots(); this.availableSlots = new AvailableSlots(); this.pendingRequests = new HashMap<>(); + this.waitingForResourceManager = new HashMap<>(); this.providerAndOwner = new ProviderAndOwner(getSelf(), slotRequestTimeout); } @@ -233,6 +235,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) { this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); this.resourceManagerGateway = checkNotNull(resourceManagerGateway); + + // work on all slots waiting for this connection + for (PendingRequest pending : waitingForResourceManager.values()) { + requestSlotFromResourceManager(pending.allocationID(), pending.future(), pending.resourceProfile()); + } + + // all sent off + waitingForResourceManager.clear(); } @RpcMethod @@ -273,16 +283,27 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { return FlinkCompletableFuture.completed(slot); } - // (2) no slot available, and no resource manager connection + // the request will be completed by a future + final AllocationID allocationID = new AllocationID(); + final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); + + // (2) need to request a slot + if (resourceManagerGateway == null) { - return FlinkCompletableFuture.completedExceptionally( - new NoResourceAvailableException("not connected to ResourceManager and no slot available")); - + // no slot available, and no resource manager connection + stashRequestWaitingForResourceManager(allocationID, resources, future); + } else { + // we have a resource manager connection, so let's ask it for more resources + requestSlotFromResourceManager(allocationID, future, resources); } - // (3) we have a resource manager connection, so let's ask it for more resources - final FlinkCompletableFuture<SimpleSlot> future = new FlinkCompletableFuture<>(); - final AllocationID allocationID = new AllocationID(); + return future; + } + + private void requestSlotFromResourceManager( + final AllocationID allocationID, + final FlinkCompletableFuture<SimpleSlot> future, + final ResourceProfile resources) { LOG.info("Requesting slot with profile {} from resource manager (request = {}).", resources, allocationID); @@ -327,8 +348,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { return null; } }, getMainThreadExecutor()); - - return future; } private void slotRequestToResourceManagerSuccess(final AllocationID allocationID) { @@ -357,6 +376,32 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { } } + private void stashRequestWaitingForResourceManager( + final AllocationID allocationID, + final ResourceProfile resources, + final FlinkCompletableFuture<SimpleSlot> future) { + + LOG.info("Cannot serve slot request, no ResourceManager connected. " + + "Adding as pending request {}", allocationID); + + waitingForResourceManager.put(allocationID, new PendingRequest(allocationID, future, resources)); + + scheduleRunAsync(new Runnable() { + @Override + public void run() { + checkTimeoutRequestWaitingForResourceManager(allocationID); + } + }, resourceManagerRequestsTimeout); + } + + private void checkTimeoutRequestWaitingForResourceManager(AllocationID allocationID) { + PendingRequest request = waitingForResourceManager.remove(allocationID); + if (request != null && !request.future().isDone()) { + request.future().completeExceptionally(new NoResourceAvailableException( + "No slot available and no connection to Resource Manager established.")); + } + } + // ------------------------------------------------------------------------ // Slot releasing & offering // ------------------------------------------------------------------------ @@ -401,6 +446,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { private PendingRequest pollMatchingPendingRequest(final AllocatedSlot slot) { final ResourceProfile slotResources = slot.getResourceProfile(); + // try the requests sent to the resource manager first for (PendingRequest request : pendingRequests.values()) { if (slotResources.isMatching(request.resourceProfile())) { pendingRequests.remove(request.allocationID()); @@ -408,6 +454,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> { } } + // try the requests waiting for a resource manager connection next + for (PendingRequest request : waitingForResourceManager.values()) { + if (slotResources.isMatching(request.resourceProfile())) { + waitingForResourceManager.remove(request.allocationID()); + return request; + } + } + // no request pending, or no request matches return null; } http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java new file mode 100644 index 0000000..89fd22f --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.instance; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.util.clock.SystemClock; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; + +/** + * Tests for the SlotPool using a proper RPC setup. + */ +public class SlotPoolRpcTest { + + private static RpcService rpcService; + + // ------------------------------------------------------------------------ + // setup + // ------------------------------------------------------------------------ + + @BeforeClass + public static void setup() { + ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new Configuration()); + rpcService = new AkkaRpcService(actorSystem, Time.seconds(10)); + } + + @AfterClass + public static void shutdown() { + rpcService.stopService(); + } + + // ------------------------------------------------------------------------ + // tests + // ------------------------------------------------------------------------ + + @Test + public void testSlotAllocationNoResourceManager() throws Exception { + final JobID jid = new JobID(); + + final SlotPool pool = new SlotPool( + rpcService, jid, + SystemClock.getInstance(), + Time.days(1), Time.days(1), + Time.milliseconds(100) // this is the timeout for the request tested here + ); + pool.start(UUID.randomUUID()); + + Future<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null); + + try { + future.get(4, TimeUnit.SECONDS); + fail("We expected a ExecutionException."); + } + catch (ExecutionException e) { + assertEquals(NoResourceAvailableException.class, e.getCause().getClass()); + } + catch (TimeoutException e) { + fail("future timed out rather than being failed"); + } + catch (Exception e) { + fail("wrong exception: " + e); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 5fa7af3..97457e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.runtime.concurrent.BiFunction; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; @@ -42,17 +41,13 @@ import org.mockito.ArgumentCaptor; import java.util.List; import java.util.UUID; -import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import static org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -123,28 +118,6 @@ public class SlotPoolTest extends TestLogger { } @Test - public void testAllocateSlotWithoutResourceManager() throws Exception { - slotPool.disconnectResourceManager(); - Future<SimpleSlot> future = slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null); - future.handleAsync( - new BiFunction<SimpleSlot, Throwable, Void>() { - @Override - public Void apply(SimpleSlot simpleSlot, Throwable throwable) { - assertNull(simpleSlot); - assertNotNull(throwable); - return null; - } - }, - rpcService.getExecutor()); - try { - future.get(1, TimeUnit.SECONDS); - fail("We expected a ExecutionException."); - } catch (ExecutionException ex) { - // we expect the exception - } - } - - @Test public void testAllocationFulfilledByReturnedSlot() throws Exception { ResourceID resourceID = new ResourceID("resource"); slotPool.registerTaskManager(resourceID);
