[FLINK-7870] [tests] Add SlotPool test to verify cancellation of failed slot requests
Adds the SlotPoolTest#testSlotRequestCancellationUponFailingRequest. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/755ae519 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/755ae519 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/755ae519 Branch: refs/heads/master Commit: 755ae519255f146aac49784af7bbe049d2c1fd13 Parents: 902425f Author: Till Rohrmann <trohrm...@apache.org> Authored: Mon Nov 6 12:16:04 2017 +0100 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Tue Nov 7 15:07:46 2017 +0100 ---------------------------------------------------------------------- .../flink/runtime/instance/SlotPoolTest.java | 65 ++++++ .../slotmanager/SlotManagerTest.java | 9 - .../utils/TestingResourceManagerGateway.java | 231 +++++++++++++++++++ 3 files changed, 296 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 5993dcb..f38894e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -23,22 +23,31 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.RpcUtils; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; + import org.junit.After; import org.junit.Before; import org.junit.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -48,6 +57,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Matchers.any; import static org.mockito.Mockito.RETURNS_MOCKS; import static org.mockito.Mockito.mock; @@ -56,6 +66,8 @@ import static org.mockito.Mockito.when; public class SlotPoolTest extends TestLogger { + private static final Logger LOG = LoggerFactory.getLogger(SlotPoolTest.class); + private final Time timeout = Time.seconds(10L); private RpcService rpcService; @@ -294,6 +306,59 @@ public class SlotPoolTest extends TestLogger { } } + /** + * Tests that a slot request is cancelled if it failed with an exception (e.g. TimeoutException). + * + * <p>See FLINK-7870 + */ + @Test + public void testSlotRequestCancellationUponFailingRequest() throws Exception { + final SlotPool slotPool = new SlotPool(rpcService, jobId); + final CompletableFuture<Acknowledge> requestSlotFuture = new CompletableFuture<>(); + final CompletableFuture<AllocationID> cancelSlotFuture = new CompletableFuture<>(); + final CompletableFuture<AllocationID> requestSlotFutureAllocationId = new CompletableFuture<>(); + + final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); + resourceManagerGateway.setRequestSlotFuture(requestSlotFuture); + resourceManagerGateway.setRequestSlotConsumer(slotRequest -> requestSlotFutureAllocationId.complete(slotRequest.getAllocationId())); + resourceManagerGateway.setCancelSlotConsumer(allocationID -> cancelSlotFuture.complete(allocationID)); + + final ScheduledUnit scheduledUnit = new ScheduledUnit(mock(Execution.class)); + + try { + slotPool.start(JobMasterId.generate(), "localhost"); + + final SlotPoolGateway slotPoolGateway = slotPool.getSelfGateway(SlotPoolGateway.class); + + slotPoolGateway.connectToResourceManager(resourceManagerGateway); + + CompletableFuture<SimpleSlot> slotFuture = slotPoolGateway.allocateSlot( + scheduledUnit, + ResourceProfile.UNKNOWN, + Collections.emptyList(), + timeout); + + requestSlotFuture.completeExceptionally(new FlinkException("Testing exception.")); + + try { + slotFuture.get(); + fail("The slot future should not have been completed properly."); + } catch (Exception ignored) { + // expected + } + + // check that a failure triggered the slot request cancellation + // with the correct allocation id + assertEquals(requestSlotFutureAllocationId.get(), cancelSlotFuture.get()); + } finally { + try { + RpcUtils.terminateRpcEndpoint(slotPool, timeout); + } catch (Exception e) { + LOG.warn("Could not properly terminate the SlotPool.", e); + } + } + } + private static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java index 55a9946..cf0aef9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerTest.java @@ -292,15 +292,6 @@ public class SlotManagerTest extends TestLogger { final TaskExecutorConnection taskManagerConnection = new TaskExecutorConnection(taskExecutorGateway); try (SlotManager slotManager = createSlotManager(resourceManagerId, resourceManagerActions)) { - // verify that if the request has not been assigned, should cancel the resource allocation - slotManager.registerSlotRequest(slotRequest); - PendingSlotRequest pendingSlotRequest = slotManager.getSlotRequest(allocationId); - assertFalse(pendingSlotRequest.isAssigned()); - - slotManager.unregisterSlotRequest(allocationId); - pendingSlotRequest = slotManager.getSlotRequest(allocationId); - assertTrue(pendingSlotRequest == null); - slotManager.registerTaskManager(taskManagerConnection, slotReport); TaskManagerSlot slot = slotManager.getSlot(slotId); http://git-wip-us.apache.org/repos/asf/flink/blob/755ae519/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java new file mode 100644 index 0000000..f11a1eb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager.utils; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.instance.HardwareDescription; +import org.apache.flink.runtime.instance.InstanceID; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import org.apache.flink.runtime.resourcemanager.ResourceOverview; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerInfo; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; + +/** + * Implementation of the {@link ResourceManagerGateway} for testing purposes solely. + */ +public class TestingResourceManagerGateway implements ResourceManagerGateway { + + private final ResourceManagerId resourceManagerId; + + private final ResourceID resourceId; + + private final long heartbeatInterval; + + private final String address; + + private final String hostname; + + private final AtomicReference<CompletableFuture<Acknowledge>> slotFutureReference; + + private volatile Consumer<AllocationID> cancelSlotConsumer; + + private volatile Consumer<SlotRequest> requestSlotConsumer; + + public TestingResourceManagerGateway() { + this( + ResourceManagerId.generate(), + ResourceID.generate(), + 10000L, + "localhost", + "localhost"); + } + + public TestingResourceManagerGateway( + ResourceManagerId resourceManagerId, + ResourceID resourceId, + long heartbeatInterval, + String address, + String hostname) { + this.resourceManagerId = Preconditions.checkNotNull(resourceManagerId); + this.resourceId = Preconditions.checkNotNull(resourceId); + this.heartbeatInterval = heartbeatInterval; + this.address = Preconditions.checkNotNull(address); + this.hostname = Preconditions.checkNotNull(hostname); + this.slotFutureReference = new AtomicReference<>(); + this.cancelSlotConsumer = null; + this.requestSlotConsumer = null; + } + + public void setRequestSlotFuture(CompletableFuture<Acknowledge> slotFuture) { + this.slotFutureReference.set(slotFuture); + } + + public void setCancelSlotConsumer(Consumer<AllocationID> cancelSlotConsumer) { + this.cancelSlotConsumer = cancelSlotConsumer; + } + + public void setRequestSlotConsumer(Consumer<SlotRequest> slotRequestConsumer) { + this.requestSlotConsumer = slotRequestConsumer; + } + + @Override + public CompletableFuture<RegistrationResponse> registerJobManager(JobMasterId jobMasterId, ResourceID jobMasterResourceId, String jobMasterAddress, JobID jobId, Time timeout) { + return CompletableFuture.completedFuture( + new JobMasterRegistrationSuccess( + heartbeatInterval, + resourceManagerId, + resourceId)); + } + + @Override + public CompletableFuture<Acknowledge> requestSlot(JobMasterId jobMasterId, SlotRequest slotRequest, Time timeout) { + Consumer<SlotRequest> currentRequestSlotConsumer = requestSlotConsumer; + + if (currentRequestSlotConsumer != null) { + currentRequestSlotConsumer.accept(slotRequest); + } + + CompletableFuture<Acknowledge> slotFuture = slotFutureReference.getAndSet(null); + + if (slotFuture != null) { + return slotFuture; + } else { + return CompletableFuture.completedFuture(Acknowledge.get()); + } + } + + @Override + public void cancelSlotRequest(AllocationID allocationID) { + Consumer<AllocationID> currentCancelSlotConsumer = cancelSlotConsumer; + + if (currentCancelSlotConsumer != null) { + currentCancelSlotConsumer.accept(allocationID); + } + } + + @Override + public CompletableFuture<RegistrationResponse> registerTaskExecutor(String taskExecutorAddress, ResourceID resourceId, SlotReport slotReport, int dataPort, HardwareDescription hardwareDescription, Time timeout) { + return CompletableFuture.completedFuture( + new TaskExecutorRegistrationSuccess( + new InstanceID(), + resourceId, + heartbeatInterval)); + } + + @Override + public void notifySlotAvailable(InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId) { + + } + + @Override + public void registerInfoMessageListener(String infoMessageListenerAddress) { + + } + + @Override + public void unRegisterInfoMessageListener(String infoMessageListenerAddress) { + + } + + @Override + public void shutDownCluster(ApplicationStatus finalStatus, String optionalDiagnostics) { + + } + + @Override + public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() { + return CompletableFuture.completedFuture(0); + } + + @Override + public void heartbeatFromTaskManager(ResourceID heartbeatOrigin, SlotReport slotReport) { + + } + + @Override + public void heartbeatFromJobManager(ResourceID heartbeatOrigin) { + + } + + @Override + public void disconnectTaskManager(ResourceID resourceID, Exception cause) { + + } + + @Override + public void disconnectJobManager(JobID jobId, Exception cause) { + + } + + @Override + public CompletableFuture<Collection<TaskManagerInfo>> requestTaskManagerInfo(Time timeout) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + + @Override + public CompletableFuture<TaskManagerInfo> requestTaskManagerInfo(InstanceID instanceId, Time timeout) { + return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented")); + } + + @Override + public CompletableFuture<ResourceOverview> requestResourceOverview(Time timeout) { + return FutureUtils.completedExceptionally(new UnsupportedOperationException("Not yet implemented")); + } + + @Override + public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) { + return CompletableFuture.completedFuture(Collections.emptyList()); + } + + @Override + public ResourceManagerId getFencingToken() { + return resourceManagerId; + } + + @Override + public String getAddress() { + return address; + } + + @Override + public String getHostname() { + return hostname; + } +}