zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r479087979
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -265,12 +292,99 @@ public void testPhysicalSlotReleaseLogicalSlots() throws
ExecutionException, Int
return payload;
})
.collect(Collectors.toList());
+ SlotRequestId slotRequestId =
context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
TestingPhysicalSlot physicalSlot =
context.getSlotProvider().getFirstResponseOrFail().get();
assertThat(payloads.stream().allMatch(payload ->
payload.getTerminalStateFuture().isDone()), is(false));
assertThat(physicalSlot.getPayload(), notNullValue());
physicalSlot.getPayload().release(new Throwable());
assertThat(payloads.stream().allMatch(payload ->
payload.getTerminalStateFuture().isDone()), is(true));
+
+
assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId),
is(true));
+
+ context.allocateSlotsFor(EV1, EV2);
+ // there should be one more physical slot allocation, as the
first allocation should be removed after releasing all logical slots
+ assertThat(context.getSlotProvider().getRequests().keySet(),
hasSize(2));
+ }
+
+ @Test
+ public void testSchedulePendingRequestBulkTimeoutCheck() {
+ TestingPhysicalSlotRequestBulkChecker bulkChecker = new
TestingPhysicalSlotRequestBulkChecker();
+ AllocationContext context =
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+ context.allocateSlotsFor(EV1, EV3);
+ PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+ assertThat(bulk.getPendingRequests(), hasSize(2));
+ assertThat(bulk.getPendingRequests(),
containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE));
+ assertThat(bulk.getAllocationIdsOfFulfilledRequests(),
hasSize(0));
+ assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT));
+ }
+
+ @Test
+ public void testRequestFulfilledInBulk() {
+ TestingPhysicalSlotRequestBulkChecker bulkChecker = new
TestingPhysicalSlotRequestBulkChecker();
+ AllocationContext context =
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+ context.allocateSlotsFor(EV1, EV3);
+ AllocationID allocationId = new AllocationID();
+ ResourceProfile pendingSlotResourceProfile =
fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId);
+ PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+ assertThat(bulk.getPendingRequests(), hasSize(1));
+ assertThat(bulk.getPendingRequests(),
containsInAnyOrder(pendingSlotResourceProfile));
+ assertThat(bulk.getAllocationIdsOfFulfilledRequests(),
hasSize(1));
+ assertThat(bulk.getAllocationIdsOfFulfilledRequests(),
containsInAnyOrder(allocationId));
+ }
+
+ @Test
+ public void testRequestBulkCancel() {
+ TestingPhysicalSlotRequestBulkChecker bulkChecker = new
TestingPhysicalSlotRequestBulkChecker();
+ AllocationContext context =
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+ // allocate 2 physical slots for 2 groups
+ List<SlotExecutionVertexAssignment> assignments1 =
context.allocateSlotsFor(EV1, EV3);
+ fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new
AllocationID());
+ PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
+ List<SlotExecutionVertexAssignment> assignments2 =
context.allocateSlotsFor(EV2);
+
+ // cancelling of (EV1, EV3) releases assignments1 and only one
physical slot for EV3
+ // the second physical slot is held by sharing EV2 from the
next bulk
+ bulk1.cancel(new Throwable());
+
+ // return completed logical slot to clear shared slpt and
release physical slot
Review comment:
typo: `slpt` -> `slot`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]