[FLINK-7870] [tests] Add SlotPool test to verify cancellation of failed slot 
requests

Adds the SlotPoolTest#testSlotRequestCancellationUponFailingRequest.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/755ae519
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/755ae519
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/755ae519

Branch: refs/heads/master
Commit: 755ae519255f146aac49784af7bbe049d2c1fd13
Parents: 902425f
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Mon Nov 6 12:16:04 2017 +0100
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Nov 7 15:07:46 2017 +0100

----------------------------------------------------------------------
 .../flink/runtime/instance/SlotPoolTest.java    |  65 ++++++
 .../slotmanager/SlotManagerTest.java            |   9 -
 .../utils/TestingResourceManagerGateway.java    | 231 +++++++++++++++++++
 3 files changed, 296 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
index 5993dcb..f38894e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java
@@ -23,22 +23,31 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import 
org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -48,6 +57,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.RETURNS_MOCKS;
 import static org.mockito.Mockito.mock;
@@ -56,6 +66,8 @@ import static org.mockito.Mockito.when;
 
 public class SlotPoolTest extends TestLogger {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(SlotPoolTest.class);
+
        private final Time timeout = Time.seconds(10L);
 
        private RpcService rpcService;
@@ -294,6 +306,59 @@ public class SlotPoolTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests that a slot request is cancelled if it failed with an 
exception (e.g. TimeoutException).
+        *
+        * <p>See FLINK-7870
+        */
+       @Test
+       public void testSlotRequestCancellationUponFailingRequest() throws 
Exception {
+               final SlotPool slotPool = new SlotPool(rpcService, jobId);
+               final CompletableFuture<Acknowledge> requestSlotFuture = new 
CompletableFuture<>();
+               final CompletableFuture<AllocationID> cancelSlotFuture = new 
CompletableFuture<>();
+               final CompletableFuture<AllocationID> 
requestSlotFutureAllocationId = new CompletableFuture<>();
+
+               final TestingResourceManagerGateway resourceManagerGateway = 
new TestingResourceManagerGateway();
+               resourceManagerGateway.setRequestSlotFuture(requestSlotFuture);
+               resourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
requestSlotFutureAllocationId.complete(slotRequest.getAllocationId()));
+               resourceManagerGateway.setCancelSlotConsumer(allocationID -> 
cancelSlotFuture.complete(allocationID));
+
+               final ScheduledUnit scheduledUnit = new 
ScheduledUnit(mock(Execution.class));
+
+               try {
+                       slotPool.start(JobMasterId.generate(), "localhost");
+
+                       final SlotPoolGateway slotPoolGateway = 
slotPool.getSelfGateway(SlotPoolGateway.class);
+
+                       
slotPoolGateway.connectToResourceManager(resourceManagerGateway);
+
+                       CompletableFuture<SimpleSlot> slotFuture = 
slotPoolGateway.allocateSlot(
+                               scheduledUnit,
+                               ResourceProfile.UNKNOWN,
+                               Collections.emptyList(),
+                               timeout);
+
+                       requestSlotFuture.completeExceptionally(new 
FlinkException("Testing exception."));
+
+                       try {
+                               slotFuture.get();
+                               fail("The slot future should not have been 
completed properly.");
+                       } catch (Exception ignored) {
+                               // expected
+                       }
+
+                       // check that a failure triggered the slot request 
cancellation
+                       // with the correct allocation id
+                       assertEquals(requestSlotFutureAllocationId.get(), 
cancelSlotFuture.get());
+               } finally {
+                       try {
+                               RpcUtils.terminateRpcEndpoint(slotPool, 
timeout);
+                       } catch (Exception e) {
+                               LOG.warn("Could not properly terminate the 
SlotPool.", e);
+                       }
+               }
+       }
+
        private static ResourceManagerGateway 
createResourceManagerGatewayMock() {
                ResourceManagerGateway resourceManagerGateway = 
mock(ResourceManagerGateway.class);
                when(resourceManagerGateway

http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
index 55a9946..cf0aef9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java
@@ -292,15 +292,6 @@ public class SlotManagerTest extends TestLogger {
                final TaskExecutorConnection taskManagerConnection = new 
TaskExecutorConnection(taskExecutorGateway);
 
                try (SlotManager slotManager = 
createSlotManager(resourceManagerId, resourceManagerActions)) {
-                       // verify that if the request has not been assigned, 
should cancel the resource allocation
-                       slotManager.registerSlotRequest(slotRequest);
-                       PendingSlotRequest pendingSlotRequest = 
slotManager.getSlotRequest(allocationId);
-                       assertFalse(pendingSlotRequest.isAssigned());
-
-                       slotManager.unregisterSlotRequest(allocationId);
-                       pendingSlotRequest = 
slotManager.getSlotRequest(allocationId);
-                       assertTrue(pendingSlotRequest == null);
-
                        slotManager.registerTaskManager(taskManagerConnection, 
slotReport);
 
                        TaskManagerSlot slot = slotManager.getSlot(slotId);

http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
new file mode 100644
index 0000000..f11a1eb
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.utils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
+import org.apache.flink.runtime.resourcemanager.ResourceOverview;
+import org.apache.flink.runtime.resourcemanager.SlotRequest;
+import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo;
+import org.apache.flink.runtime.taskexecutor.SlotReport;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of the {@link ResourceManagerGateway} for testing purposes 
solely.
+ */
+public class TestingResourceManagerGateway implements ResourceManagerGateway {
+
+       private final ResourceManagerId resourceManagerId;
+
+       private final ResourceID resourceId;
+
+       private final long heartbeatInterval;
+
+       private final String address;
+
+       private final String hostname;
+
+       private final AtomicReference<CompletableFuture<Acknowledge>> 
slotFutureReference;
+
+       private volatile Consumer<AllocationID> cancelSlotConsumer;
+
+       private volatile Consumer<SlotRequest> requestSlotConsumer;
+
+       public TestingResourceManagerGateway() {
+               this(
+                       ResourceManagerId.generate(),
+                       ResourceID.generate(),
+                       10000L,
+                       "localhost",
+                       "localhost");
+       }
+
+       public TestingResourceManagerGateway(
+                       ResourceManagerId resourceManagerId,
+                       ResourceID resourceId,
+                       long heartbeatInterval,
+                       String address,
+                       String hostname) {
+               this.resourceManagerId = 
Preconditions.checkNotNull(resourceManagerId);
+               this.resourceId = Preconditions.checkNotNull(resourceId);
+               this.heartbeatInterval = heartbeatInterval;
+               this.address = Preconditions.checkNotNull(address);
+               this.hostname = Preconditions.checkNotNull(hostname);
+               this.slotFutureReference = new AtomicReference<>();
+               this.cancelSlotConsumer = null;
+               this.requestSlotConsumer = null;
+       }
+
+       public void setRequestSlotFuture(CompletableFuture<Acknowledge> 
slotFuture) {
+               this.slotFutureReference.set(slotFuture);
+       }
+
+       public void setCancelSlotConsumer(Consumer<AllocationID> 
cancelSlotConsumer) {
+               this.cancelSlotConsumer = cancelSlotConsumer;
+       }
+
+       public void setRequestSlotConsumer(Consumer<SlotRequest> 
slotRequestConsumer) {
+               this.requestSlotConsumer = slotRequestConsumer;
+       }
+
+       @Override
+       public CompletableFuture<RegistrationResponse> 
registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, 
String jobMasterAddress, JobID jobId, Time timeout) {
+               return CompletableFuture.completedFuture(
+                       new JobMasterRegistrationSuccess(
+                               heartbeatInterval,
+                               resourceManagerId,
+                               resourceId));
+       }
+
+       @Override
+       public CompletableFuture<Acknowledge> requestSlot(JobMasterId 
jobMasterId, SlotRequest slotRequest, Time timeout) {
+               Consumer<SlotRequest> currentRequestSlotConsumer = 
requestSlotConsumer;
+
+               if (currentRequestSlotConsumer != null) {
+                       currentRequestSlotConsumer.accept(slotRequest);
+               }
+
+               CompletableFuture<Acknowledge> slotFuture = 
slotFutureReference.getAndSet(null);
+
+               if (slotFuture != null) {
+                       return slotFuture;
+               } else {
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               }
+       }
+
+       @Override
+       public void cancelSlotRequest(AllocationID allocationID) {
+               Consumer<AllocationID> currentCancelSlotConsumer = 
cancelSlotConsumer;
+
+               if (currentCancelSlotConsumer != null) {
+                       currentCancelSlotConsumer.accept(allocationID);
+               }
+       }
+
+       @Override
+       public CompletableFuture<RegistrationResponse> 
registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, 
SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, 
Time timeout) {
+               return CompletableFuture.completedFuture(
+                       new TaskExecutorRegistrationSuccess(
+                               new InstanceID(),
+                               resourceId,
+                               heartbeatInterval));
+       }
+
+       @Override
+       public void notifySlotAvailable(InstanceID instanceId, SlotID slotID, 
AllocationID oldAllocationId) {
+
+       }
+
+       @Override
+       public void registerInfoMessageListener(String 
infoMessageListenerAddress) {
+
+       }
+
+       @Override
+       public void unRegisterInfoMessageListener(String 
infoMessageListenerAddress) {
+
+       }
+
+       @Override
+       public void shutDownCluster(ApplicationStatus finalStatus, String 
optionalDiagnostics) {
+
+       }
+
+       @Override
+       public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() {
+               return CompletableFuture.completedFuture(0);
+       }
+
+       @Override
+       public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, 
SlotReport slotReport) {
+
+       }
+
+       @Override
+       public void heartbeatFromJobManager(ResourceID heartbeatOrigin) {
+
+       }
+
+       @Override
+       public void disconnectTaskManager(ResourceID resourceID, Exception 
cause) {
+
+       }
+
+       @Override
+       public void disconnectJobManager(JobID jobId, Exception cause) {
+
+       }
+
+       @Override
+       public CompletableFuture<Collection<TaskManagerInfo>> 
requestTaskManagerInfo(Time timeout) {
+               return 
CompletableFuture.completedFuture(Collections.emptyList());
+       }
+
+       @Override
+       public CompletableFuture<TaskManagerInfo> 
requestTaskManagerInfo(InstanceID instanceId, Time timeout) {
+               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException("Not yet implemented"));
+       }
+
+       @Override
+       public CompletableFuture<ResourceOverview> requestResourceOverview(Time 
timeout) {
+               return FutureUtils.completedExceptionally(new 
UnsupportedOperationException("Not yet implemented"));
+       }
+
+       @Override
+       public CompletableFuture<Collection<Tuple2<ResourceID, String>>> 
requestTaskManagerMetricQueryServicePaths(Time timeout) {
+               return 
CompletableFuture.completedFuture(Collections.emptyList());
+       }
+
+       @Override
+       public ResourceManagerId getFencingToken() {
+               return resourceManagerId;
+       }
+
+       @Override
+       public String getAddress() {
+               return address;
+       }
+
+       @Override
+       public String getHostname() {
+               return hostname;
+       }
+}

Reply via email to