azagrebin commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r475234042
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -274,6 +277,73 @@ public void testPhysicalSlotReleaseLogicalSlots() throws
ExecutionException, Int
assertThat(payloads.stream().allMatch(payload ->
payload.getTerminalStateFuture().isDone()), is(true));
}
+ @Test
+ public void testSchedulePendingRequestBulkTimeoutCheck() {
+ TestingPhysicalSlotRequestBulkChecker bulkChecker = new
TestingPhysicalSlotRequestBulkChecker();
+ AllocationContext context =
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+ context.allocateSlotsFor(EV1, EV3);
+ PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+ assertThat(bulk.getPendingRequests(), hasSize(2));
+ assertThat(bulk.getPendingRequests(),
containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE));
+ assertThat(bulk.getAllocationIdsOfFulfilledRequests(),
hasSize(0));
+ assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT));
+ }
+
+ @Test
+ public void testRequestFulfilledInBulk() {
+ TestingPhysicalSlotRequestBulkChecker bulkChecker = new
TestingPhysicalSlotRequestBulkChecker();
+ AllocationContext context =
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+ context.allocateSlotsFor(EV1, EV3);
+ AllocationID allocationId = new AllocationID();
+ ResourceProfile pendingSlotResourceProfile =
fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId);
+ PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+ assertThat(bulk.getPendingRequests(), hasSize(1));
+ assertThat(bulk.getPendingRequests(),
containsInAnyOrder(pendingSlotResourceProfile));
+ assertThat(bulk.getAllocationIdsOfFulfilledRequests(),
hasSize(1));
+ assertThat(bulk.getAllocationIdsOfFulfilledRequests(),
containsInAnyOrder(allocationId));
+ }
+
+ @Test
+ public void testRequestBulkCancel() {
+ TestingPhysicalSlotRequestBulkChecker bulkChecker = new
TestingPhysicalSlotRequestBulkChecker();
+ AllocationContext context =
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+ // allocate 2 physical slots for 2 groups
+ List<SlotExecutionVertexAssignment> assignments1 =
context.allocateSlotsFor(EV1, EV3);
+ fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new
AllocationID());
+ PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
+ List<SlotExecutionVertexAssignment> assignments2 =
context.allocateSlotsFor(EV2);
+ // cancelling of (EV1, EV3) releases assignments1 and only one
physical slot for EV3
+ // the second physical slot is held by sharing EV2 from the
next bulk
+ bulk1.cancel(new Throwable());
+ // EV3 needs again a physical slot, therefore there are 3
requests overall
+ context.allocateSlotsFor(EV1, EV3);
+ boolean ev1failed =
assignments1.get(0).getLogicalSlotFuture().isCompletedExceptionally();
+ boolean ev2failed =
assignments1.get(0).getLogicalSlotFuture().isCompletedExceptionally();
+
+ assertThat(context.getSlotProvider().getRequests().values(),
hasSize(3));
+ // either EV1 or EV3 logical slot future is fulfilled before
cancellation
+ assertThat(ev1failed != ev2failed, is(false));
+
assertThat(assignments2.get(0).getLogicalSlotFuture().isCompletedExceptionally(),
is(false));
Review comment:
this is not necessarily true because
`fulfilOneOfTwoSlotRequestsAndGetPendingProfile` fulfils only one physical slot
request, either for (EV1, EV2) or (EV3). If it fulfils the physical slot
request for (EV1, EV2), (EV3) is still pending. The important bit was that one
of [EV1, EV3] bulk gets canceled if the [EV1, EV3] bulk is cancelled but not
EV3 as it is from another bulk.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]