[FLINK-5140] [JobManager] SlotPool accepts allocation requests while 
ResourceManager is not connected

The requests are kept for a certain time and fulfilled once the ResourceManager 
is connected.
If no ResourceManager is connected in time, the allocation requests are failed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6b3283ec
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6b3283ec
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6b3283ec

Branch: refs/heads/flip-6
Commit: 6b3283ecd980e3db5d5b6cca86885d0dfad6e2cd
Parents: 82c1fcf
Author: Stephan Ewen <[email protected]>
Authored: Fri Dec 2 16:17:11 2016 +0100
Committer: Stephan Ewen <[email protected]>
Committed: Mon Dec 5 02:49:43 2016 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/instance/SlotPool.java |  76 ++++++++++++--
 .../flink/runtime/instance/SlotPoolRpcTest.java | 101 +++++++++++++++++++
 .../flink/runtime/instance/SlotPoolTest.java    |  27 -----
 3 files changed, 166 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 65a5c45..1a2adfe 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -93,8 +93,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
 
        // 
------------------------------------------------------------------------
 
-       private final Object lock = new Object();
-
        private final JobID jobId;
 
        private final ProviderAndOwner providerAndOwner;
@@ -111,6 +109,9 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        /** All pending requests waiting for slots */
        private final HashMap<AllocationID, PendingRequest> pendingRequests;
 
+       /** The requests that are waiting for the resource manager to be 
connected */
+       private final HashMap<AllocationID, PendingRequest> 
waitingForResourceManager;
+
        /** Timeout for request calls to the ResourceManager */
        private final Time resourceManagerRequestsTimeout;
 
@@ -154,6 +155,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                this.allocatedSlots = new AllocatedSlots();
                this.availableSlots = new AvailableSlots();
                this.pendingRequests = new HashMap<>();
+               this.waitingForResourceManager = new HashMap<>();
 
                this.providerAndOwner = new ProviderAndOwner(getSelf(), 
slotRequestTimeout);
        }
@@ -233,6 +235,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> 
{
        public void connectToResourceManager(UUID resourceManagerLeaderId, 
ResourceManagerGateway resourceManagerGateway) {
                this.resourceManagerLeaderId = 
checkNotNull(resourceManagerLeaderId);
                this.resourceManagerGateway = 
checkNotNull(resourceManagerGateway);
+
+               // work on all slots waiting for this connection
+               for (PendingRequest pending : 
waitingForResourceManager.values()) {
+                       requestSlotFromResourceManager(pending.allocationID(), 
pending.future(), pending.resourceProfile());
+               }
+
+               // all sent off
+               waitingForResourceManager.clear();
        }
 
        @RpcMethod
@@ -273,16 +283,27 @@ public class SlotPool extends 
RpcEndpoint<SlotPoolGateway> {
                        return FlinkCompletableFuture.completed(slot);
                }
 
-               // (2) no slot available, and no resource manager connection
+               // the request will be completed by a future
+               final AllocationID allocationID = new AllocationID();
+               final FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
+
+               // (2) need to request a slot
+
                if (resourceManagerGateway == null) {
-                       return FlinkCompletableFuture.completedExceptionally(
-                                       new NoResourceAvailableException("not 
connected to ResourceManager and no slot available"));
-                       
+                       // no slot available, and no resource manager connection
+                       stashRequestWaitingForResourceManager(allocationID, 
resources, future);
+               } else {
+                       // we have a resource manager connection, so let's ask 
it for more resources
+                       requestSlotFromResourceManager(allocationID, future, 
resources);
                }
 
-               // (3) we have a resource manager connection, so let's ask it 
for more resources
-               final FlinkCompletableFuture<SimpleSlot> future = new 
FlinkCompletableFuture<>();
-               final AllocationID allocationID = new AllocationID();
+               return future;
+       }
+
+       private void requestSlotFromResourceManager(
+                       final AllocationID allocationID,
+                       final FlinkCompletableFuture<SimpleSlot> future,
+                       final ResourceProfile resources) {
 
                LOG.info("Requesting slot with profile {} from resource manager 
(request = {}).", resources, allocationID);
 
@@ -327,8 +348,6 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
                                return null;
                        }
                }, getMainThreadExecutor());
-
-               return future;
        }
 
        private void slotRequestToResourceManagerSuccess(final AllocationID 
allocationID) {
@@ -357,6 +376,32 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> 
{
                }
        }
 
+       private void stashRequestWaitingForResourceManager(
+                       final AllocationID allocationID,
+                       final ResourceProfile resources,
+                       final FlinkCompletableFuture<SimpleSlot> future) {
+
+               LOG.info("Cannot serve slot request, no ResourceManager 
connected. " +
+                               "Adding as pending request {}",  allocationID);
+
+               waitingForResourceManager.put(allocationID, new 
PendingRequest(allocationID, future, resources));
+
+               scheduleRunAsync(new Runnable() {
+                       @Override
+                       public void run() {
+                               
checkTimeoutRequestWaitingForResourceManager(allocationID);
+                       }
+               }, resourceManagerRequestsTimeout);
+       }
+
+       private void checkTimeoutRequestWaitingForResourceManager(AllocationID 
allocationID) {
+               PendingRequest request = 
waitingForResourceManager.remove(allocationID);
+               if (request != null && !request.future().isDone()) {
+                       request.future().completeExceptionally(new 
NoResourceAvailableException(
+                                       "No slot available and no connection to 
Resource Manager established."));
+               }
+       }
+
        // 
------------------------------------------------------------------------
        //  Slot releasing & offering
        // 
------------------------------------------------------------------------
@@ -401,6 +446,7 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> {
        private PendingRequest pollMatchingPendingRequest(final AllocatedSlot 
slot) {
                final ResourceProfile slotResources = slot.getResourceProfile();
 
+               // try the requests sent to the resource manager first
                for (PendingRequest request : pendingRequests.values()) {
                        if 
(slotResources.isMatching(request.resourceProfile())) {
                                pendingRequests.remove(request.allocationID());
@@ -408,6 +454,14 @@ public class SlotPool extends RpcEndpoint<SlotPoolGateway> 
{
                        }
                }
 
+               // try the requests waiting for a resource manager connection 
next
+               for (PendingRequest request : 
waitingForResourceManager.values()) {
+                       if 
(slotResources.isMatching(request.resourceProfile())) {
+                               
waitingForResourceManager.remove(request.allocationID());
+                               return request;
+                       }
+               }
+
                // no request pending, or no request matches
                return null;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
new file mode 100644
index 0000000..89fd22f
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.instance;
+
+import akka.actor.ActorSystem;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
+import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.util.clock.SystemClock;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for the SlotPool using a proper RPC setup.
+ */
+public class SlotPoolRpcTest {
+
+       private static RpcService rpcService;
+
+       // 
------------------------------------------------------------------------
+       //  setup
+       // 
------------------------------------------------------------------------
+
+       @BeforeClass
+       public static void setup() {
+               ActorSystem actorSystem = AkkaUtils.createLocalActorSystem(new 
Configuration());
+               rpcService = new AkkaRpcService(actorSystem, Time.seconds(10));
+       }
+
+       @AfterClass
+       public static  void shutdown() {
+               rpcService.stopService();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  tests
+       // 
------------------------------------------------------------------------
+
+       @Test
+       public void testSlotAllocationNoResourceManager() throws Exception {
+               final JobID jid = new JobID();
+               
+               final SlotPool pool = new SlotPool(
+                               rpcService, jid,
+                               SystemClock.getInstance(),
+                               Time.days(1), Time.days(1),
+                               Time.milliseconds(100) // this is the timeout 
for the request tested here
+               );
+               pool.start(UUID.randomUUID());
+
+               Future<SimpleSlot> future = 
pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null);
+
+               try {
+                       future.get(4, TimeUnit.SECONDS);
+                       fail("We expected a ExecutionException.");
+               }
+               catch (ExecutionException e) {
+                       assertEquals(NoResourceAvailableException.class, 
e.getCause().getClass());
+               }
+               catch (TimeoutException e) {
+                       fail("future timed out rather than being failed");
+               }
+               catch (Exception e) {
+                       fail("wrong exception: " + e);
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6b3283ec/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5fa7af3..97457e1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
@@ -42,17 +41,13 @@ import org.mockito.ArgumentCaptor;
 
 import java.util.List;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 import static 
org.apache.flink.runtime.instance.AvailableSlotsTest.DEFAULT_TESTING_PROFILE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
@@ -123,28 +118,6 @@ public class SlotPoolTest extends TestLogger {
        }
 
        @Test
-       public void testAllocateSlotWithoutResourceManager() throws Exception {
-               slotPool.disconnectResourceManager();
-               Future<SimpleSlot> future = 
slotPool.allocateSlot(mock(ScheduledUnit.class),DEFAULT_TESTING_PROFILE, null);
-               future.handleAsync(
-                       new BiFunction<SimpleSlot, Throwable, Void>() {
-                               @Override
-                               public Void apply(SimpleSlot simpleSlot, 
Throwable throwable) {
-                                       assertNull(simpleSlot);
-                                       assertNotNull(throwable);
-                                       return null;
-                               }
-                       },
-                       rpcService.getExecutor());
-               try {
-                       future.get(1, TimeUnit.SECONDS);
-                       fail("We expected a ExecutionException.");
-               } catch (ExecutionException ex) {
-                       // we expect the exception
-               }
-       }
-
-       @Test
        public void testAllocationFulfilledByReturnedSlot() throws Exception {
                ResourceID resourceID = new ResourceID("resource");
                slotPool.registerTaskManager(resourceID);

Reply via email to