This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.12 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 3e8448c65a50bc94676e6a916d9a6b18b12b7210 Author: Till Rohrmann <[email protected]> AuthorDate: Fri Dec 11 18:41:23 2020 +0100 [FLINK-19832][tests] Add test for immediately failed PhysicalSlot in SlotSharingExecutionSlotAllocator This commit adds the SlotSharingExecutionSlotAllocatorTest.failLogicalSlotsIfPhysicalSlotIsFailed which ensures that the SlotSharingExecutionSlotAllocator will fail all logical slots and return the physical slot immediately if the requested physical slot future is completed exceptionally. This closes #13879. --- .../SlotSharingExecutionSlotAllocatorTest.java | 78 +++++++++++++++++----- 1 file changed, 61 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java index aee349f..6e16a9d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java @@ -35,12 +35,15 @@ import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecke import org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory; import org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.TestingPhysicalSlot; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import org.apache.flink.util.function.BiConsumerWithException; import org.junit.Test; +import javax.annotation.Nullable; + import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -59,10 +62,12 @@ import java.util.stream.Stream; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.notNullValue; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -143,7 +148,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { AllocationContext context = AllocationContext .newBuilder() .addGroup(EV1) - .completePhysicalSlotFutureManually() + .withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode.MANUAL) .build(); CompletableFuture<LogicalSlot> logicalSlotFuture = context.allocateSlotsFor(EV1).get(0).getLogicalSlotFuture(); SlotRequestId slotRequestId = context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId(); @@ -228,7 +233,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { AllocationContext context = AllocationContext .newBuilder() .addGroup(EV1, EV2, EV3) - .completePhysicalSlotFutureManually(completePhysicalSlotFutureManually) + .withPhysicalSlotFutureCompletionMode(completePhysicalSlotFutureManually ? PhysicalSlotFutureCompletionMode.MANUAL : PhysicalSlotFutureCompletionMode.SUCCESS) .build(); List<SlotExecutionVertexAssignment> assignments = context.allocateSlotsFor(EV1, EV2); @@ -360,6 +365,29 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { assertThat(bulk.getPendingRequests(), hasSize(0)); } + @Test + public void failLogicalSlotsIfPhysicalSlotIsFailed() { + final TestingPhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); + AllocationContext context = AllocationContext.newBuilder() + .addGroup(EV1, EV2) + .withBulkChecker(bulkChecker) + .withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode.FAILURE) + .build(); + + final List<SlotExecutionVertexAssignment> allocatedSlots = context.allocateSlotsFor( + EV1, + EV2); + + for (SlotExecutionVertexAssignment allocatedSlot : allocatedSlots) { + assertTrue(allocatedSlot.getLogicalSlotFuture().isCompletedExceptionally()); + } + + assertThat(bulkChecker.getBulk().getPendingRequests(), is(empty())); + + final Set<SlotRequestId> requests = context.getSlotProvider().getRequests().keySet(); + assertThat(context.getSlotProvider().getCancellations().keySet(), is(requests)); + } + private static List<ExecutionVertexID> getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) { return assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList()); } @@ -370,7 +398,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { .addGroup(EV1, EV2) .addGroup(EV3) .withBulkChecker(bulkChecker) - .completePhysicalSlotFutureManually() + .withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode.MANUAL) .build(); } @@ -386,6 +414,12 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { return requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile(); } + private enum PhysicalSlotFutureCompletionMode { + SUCCESS, + FAILURE, + MANUAL, + } + private static class AllocationContext { private final TestingPhysicalSlotProvider slotProvider; private final TestingSlotSharingStrategy slotSharingStrategy; @@ -437,22 +471,20 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { private static class Builder { private final Collection<ExecutionVertexID[]> groups = new ArrayList<>(); - private boolean completePhysicalSlotFutureManually = false; + private PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion = PhysicalSlotFutureCompletionMode.SUCCESS; private boolean slotWillBeOccupiedIndefinitely = false; private PhysicalSlotRequestBulkChecker bulkChecker = new TestingPhysicalSlotRequestBulkChecker(); + @Nullable + private PhysicalSlotProvider physicalSlotProvider = null; + private Builder addGroup(ExecutionVertexID... group) { groups.add(group); return this; } - private Builder completePhysicalSlotFutureManually() { - completePhysicalSlotFutureManually(true); - return this; - } - - private Builder completePhysicalSlotFutureManually(boolean value) { - this.completePhysicalSlotFutureManually = value; + private Builder withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion) { + this.physicalSlotFutureCompletion = physicalSlotFutureCompletion; return this; } @@ -466,8 +498,13 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { return this; } + private Builder withPhysicalSlotProvider(PhysicalSlotProvider physicalSlotProvider) { + this.physicalSlotProvider = physicalSlotProvider; + return this; + } + private AllocationContext build() { - TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(completePhysicalSlotFutureManually); + TestingPhysicalSlotProvider slotProvider = new TestingPhysicalSlotProvider(physicalSlotFutureCompletion); TestingSharedSlotProfileRetrieverFactory sharedSlotProfileRetrieverFactory = new TestingSharedSlotProfileRetrieverFactory(); TestingSlotSharingStrategy slotSharingStrategy = TestingSlotSharingStrategy.createWithGroups(groups); @@ -492,10 +529,10 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { private final Map<SlotRequestId, PhysicalSlotRequest> requests; private final Map<SlotRequestId, CompletableFuture<TestingPhysicalSlot>> responses; private final Map<SlotRequestId, Throwable> cancellations; - private final boolean completePhysicalSlotFutureManually; + private final PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion; - private TestingPhysicalSlotProvider(boolean completePhysicalSlotFutureManually) { - this.completePhysicalSlotFutureManually = completePhysicalSlotFutureManually; + private TestingPhysicalSlotProvider(PhysicalSlotFutureCompletionMode physicalSlotFutureCompletion) { + this.physicalSlotFutureCompletion = physicalSlotFutureCompletion; this.requests = new HashMap<>(); this.responses = new HashMap<>(); this.cancellations = new HashMap<>(); @@ -507,9 +544,16 @@ public class SlotSharingExecutionSlotAllocatorTest extends TestLogger { requests.put(slotRequestId, physicalSlotRequest); CompletableFuture<TestingPhysicalSlot> resultFuture = new CompletableFuture<>(); responses.put(slotRequestId, resultFuture); - if (!completePhysicalSlotFutureManually) { - completePhysicalSlotFutureFor(slotRequestId, new AllocationID()); + + switch (physicalSlotFutureCompletion) { + case SUCCESS: + completePhysicalSlotFutureFor(slotRequestId, new AllocationID()); + break; + case FAILURE: + resultFuture.completeExceptionally(new FlinkException("Test failure.")); + break; } + return resultFuture.thenApply(physicalSlot -> new PhysicalSlotRequest.Result(slotRequestId, physicalSlot)); }
