This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.12
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3e8448c65a50bc94676e6a916d9a6b18b12b7210
Author: Till Rohrmann <[email protected]>
AuthorDate: Fri Dec 11 18:41:23 2020 +0100

    [FLINK-19832][tests] Add test for immediately failed PhysicalSlot in 
SlotSharingExecutionSlotAllocator
    
    This commit adds the 
SlotSharingExecutionSlotAllocatorTest.failLogicalSlotsIfPhysicalSlotIsFailed 
which
    ensures that the SlotSharingExecutionSlotAllocator will fail all logical 
slots and return the physical slot
    immediately if the requested physical slot future is completed 
exceptionally.
    
    This closes #13879.
---
 .../SlotSharingExecutionSlotAllocatorTest.java     | 78 +++++++++++++++++-----
 1 file changed, 61 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
index aee349f..6e16a9d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
@@ -35,12 +35,15 @@ import 
org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkChecke
 import 
org.apache.flink.runtime.scheduler.SharedSlotProfileRetriever.SharedSlotProfileRetrieverFactory;
 import 
org.apache.flink.runtime.scheduler.SharedSlotTestingUtils.TestingPhysicalSlot;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.BiConsumerWithException;
 
 import org.junit.Test;
 
+import javax.annotation.Nullable;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -59,10 +62,12 @@ import java.util.stream.Stream;
 import static 
org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createRandomExecutionVertexId;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -143,7 +148,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                AllocationContext context = AllocationContext
                        .newBuilder()
                        .addGroup(EV1)
-                       .completePhysicalSlotFutureManually()
+                       
.withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode.MANUAL)
                        .build();
                CompletableFuture<LogicalSlot> logicalSlotFuture = 
context.allocateSlotsFor(EV1).get(0).getLogicalSlotFuture();
                SlotRequestId slotRequestId = 
context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
@@ -228,7 +233,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                AllocationContext context = AllocationContext
                        .newBuilder()
                        .addGroup(EV1, EV2, EV3)
-                       
.completePhysicalSlotFutureManually(completePhysicalSlotFutureManually)
+                       
.withPhysicalSlotFutureCompletionMode(completePhysicalSlotFutureManually ? 
PhysicalSlotFutureCompletionMode.MANUAL : 
PhysicalSlotFutureCompletionMode.SUCCESS)
                        .build();
 
                List<SlotExecutionVertexAssignment> assignments = 
context.allocateSlotsFor(EV1, EV2);
@@ -360,6 +365,29 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                assertThat(bulk.getPendingRequests(), hasSize(0));
        }
 
+       @Test
+       public void failLogicalSlotsIfPhysicalSlotIsFailed() {
+               final TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = AllocationContext.newBuilder()
+                       .addGroup(EV1, EV2)
+                       .withBulkChecker(bulkChecker)
+                       
.withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode.FAILURE)
+                       .build();
+
+               final List<SlotExecutionVertexAssignment> allocatedSlots = 
context.allocateSlotsFor(
+                       EV1,
+                       EV2);
+
+               for (SlotExecutionVertexAssignment allocatedSlot : 
allocatedSlots) {
+                       
assertTrue(allocatedSlot.getLogicalSlotFuture().isCompletedExceptionally());
+               }
+
+               assertThat(bulkChecker.getBulk().getPendingRequests(), 
is(empty()));
+
+               final Set<SlotRequestId> requests = 
context.getSlotProvider().getRequests().keySet();
+               
assertThat(context.getSlotProvider().getCancellations().keySet(), is(requests));
+       }
+
        private static List<ExecutionVertexID> 
getAssignIds(Collection<SlotExecutionVertexAssignment> assignments) {
                return 
assignments.stream().map(SlotExecutionVertexAssignment::getExecutionVertexId).collect(Collectors.toList());
        }
@@ -370,7 +398,7 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                        .addGroup(EV1, EV2)
                        .addGroup(EV3)
                        .withBulkChecker(bulkChecker)
-                       .completePhysicalSlotFutureManually()
+                       
.withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode.MANUAL)
                        .build();
        }
 
@@ -386,6 +414,12 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                return 
requests.get(slotRequestId2).getSlotProfile().getPhysicalSlotResourceProfile();
        }
 
+       private enum PhysicalSlotFutureCompletionMode {
+               SUCCESS,
+               FAILURE,
+               MANUAL,
+       }
+
        private static class AllocationContext {
                private final TestingPhysicalSlotProvider slotProvider;
                private final TestingSlotSharingStrategy slotSharingStrategy;
@@ -437,22 +471,20 @@ public class SlotSharingExecutionSlotAllocatorTest 
extends TestLogger {
 
                private static class Builder {
                        private final Collection<ExecutionVertexID[]> groups = 
new ArrayList<>();
-                       private boolean completePhysicalSlotFutureManually = 
false;
+                       private PhysicalSlotFutureCompletionMode 
physicalSlotFutureCompletion = PhysicalSlotFutureCompletionMode.SUCCESS;
                        private boolean slotWillBeOccupiedIndefinitely = false;
                        private PhysicalSlotRequestBulkChecker bulkChecker = 
new TestingPhysicalSlotRequestBulkChecker();
 
+                       @Nullable
+                       private PhysicalSlotProvider physicalSlotProvider = 
null;
+
                        private Builder addGroup(ExecutionVertexID... group) {
                                groups.add(group);
                                return this;
                        }
 
-                       private Builder completePhysicalSlotFutureManually() {
-                               completePhysicalSlotFutureManually(true);
-                               return this;
-                       }
-
-                       private Builder 
completePhysicalSlotFutureManually(boolean value) {
-                               this.completePhysicalSlotFutureManually = value;
+                       private Builder 
withPhysicalSlotFutureCompletionMode(PhysicalSlotFutureCompletionMode 
physicalSlotFutureCompletion) {
+                               this.physicalSlotFutureCompletion = 
physicalSlotFutureCompletion;
                                return this;
                        }
 
@@ -466,8 +498,13 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                                return this;
                        }
 
+                       private Builder 
withPhysicalSlotProvider(PhysicalSlotProvider physicalSlotProvider) {
+                               this.physicalSlotProvider = 
physicalSlotProvider;
+                               return this;
+                       }
+
                        private AllocationContext build() {
-                               TestingPhysicalSlotProvider slotProvider = new 
TestingPhysicalSlotProvider(completePhysicalSlotFutureManually);
+                               TestingPhysicalSlotProvider slotProvider = new 
TestingPhysicalSlotProvider(physicalSlotFutureCompletion);
                                TestingSharedSlotProfileRetrieverFactory 
sharedSlotProfileRetrieverFactory =
                                        new 
TestingSharedSlotProfileRetrieverFactory();
                                TestingSlotSharingStrategy slotSharingStrategy 
= TestingSlotSharingStrategy.createWithGroups(groups);
@@ -492,10 +529,10 @@ public class SlotSharingExecutionSlotAllocatorTest 
extends TestLogger {
                private final Map<SlotRequestId, PhysicalSlotRequest> requests;
                private final Map<SlotRequestId, 
CompletableFuture<TestingPhysicalSlot>> responses;
                private final Map<SlotRequestId, Throwable> cancellations;
-               private final boolean completePhysicalSlotFutureManually;
+               private final PhysicalSlotFutureCompletionMode 
physicalSlotFutureCompletion;
 
-               private TestingPhysicalSlotProvider(boolean 
completePhysicalSlotFutureManually) {
-                       this.completePhysicalSlotFutureManually = 
completePhysicalSlotFutureManually;
+               private 
TestingPhysicalSlotProvider(PhysicalSlotFutureCompletionMode 
physicalSlotFutureCompletion) {
+                       this.physicalSlotFutureCompletion = 
physicalSlotFutureCompletion;
                        this.requests = new HashMap<>();
                        this.responses = new HashMap<>();
                        this.cancellations = new HashMap<>();
@@ -507,9 +544,16 @@ public class SlotSharingExecutionSlotAllocatorTest extends 
TestLogger {
                        requests.put(slotRequestId, physicalSlotRequest);
                        CompletableFuture<TestingPhysicalSlot> resultFuture = 
new CompletableFuture<>();
                        responses.put(slotRequestId, resultFuture);
-                       if (!completePhysicalSlotFutureManually) {
-                               completePhysicalSlotFutureFor(slotRequestId, 
new AllocationID());
+
+                       switch (physicalSlotFutureCompletion) {
+                               case SUCCESS:
+                                       
completePhysicalSlotFutureFor(slotRequestId, new AllocationID());
+                                       break;
+                               case FAILURE:
+                                       resultFuture.completeExceptionally(new 
FlinkException("Test failure."));
+                                       break;
                        }
+
                        return resultFuture.thenApply(physicalSlot -> new 
PhysicalSlotRequest.Result(slotRequestId, physicalSlot));
                }
 

Reply via email to