zhuzhurk commented on a change in pull request #13181:
URL: https://github.com/apache/flink/pull/13181#discussion_r479087979



##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorTest.java
##########
@@ -265,12 +292,99 @@ public void testPhysicalSlotReleaseLogicalSlots() throws 
ExecutionException, Int
                                return payload;
                        })
                        .collect(Collectors.toList());
+               SlotRequestId slotRequestId = 
context.getSlotProvider().getFirstRequestOrFail().getSlotRequestId();
                TestingPhysicalSlot physicalSlot = 
context.getSlotProvider().getFirstResponseOrFail().get();
 
                assertThat(payloads.stream().allMatch(payload -> 
payload.getTerminalStateFuture().isDone()), is(false));
                assertThat(physicalSlot.getPayload(), notNullValue());
                physicalSlot.getPayload().release(new Throwable());
                assertThat(payloads.stream().allMatch(payload -> 
payload.getTerminalStateFuture().isDone()), is(true));
+
+               
assertThat(context.getSlotProvider().getCancellations().containsKey(slotRequestId),
 is(true));
+
+               context.allocateSlotsFor(EV1, EV2);
+               // there should be one more physical slot allocation, as the 
first allocation should be removed after releasing all logical slots
+               assertThat(context.getSlotProvider().getRequests().keySet(), 
hasSize(2));
+       }
+
+       @Test
+       public void testSchedulePendingRequestBulkTimeoutCheck() {
+               TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = 
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+               context.allocateSlotsFor(EV1, EV3);
+               PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+               assertThat(bulk.getPendingRequests(), hasSize(2));
+               assertThat(bulk.getPendingRequests(), 
containsInAnyOrder(RESOURCE_PROFILE.multiply(2), RESOURCE_PROFILE));
+               assertThat(bulk.getAllocationIdsOfFulfilledRequests(), 
hasSize(0));
+               assertThat(bulkChecker.getTimeout(), is(ALLOCATION_TIMEOUT));
+       }
+
+       @Test
+       public void testRequestFulfilledInBulk() {
+               TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = 
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+               context.allocateSlotsFor(EV1, EV3);
+               AllocationID allocationId = new AllocationID();
+               ResourceProfile pendingSlotResourceProfile = 
fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, allocationId);
+               PhysicalSlotRequestBulk bulk = bulkChecker.getBulk();
+
+               assertThat(bulk.getPendingRequests(), hasSize(1));
+               assertThat(bulk.getPendingRequests(), 
containsInAnyOrder(pendingSlotResourceProfile));
+               assertThat(bulk.getAllocationIdsOfFulfilledRequests(), 
hasSize(1));
+               assertThat(bulk.getAllocationIdsOfFulfilledRequests(), 
containsInAnyOrder(allocationId));
+       }
+
+       @Test
+       public void testRequestBulkCancel() {
+               TestingPhysicalSlotRequestBulkChecker bulkChecker = new 
TestingPhysicalSlotRequestBulkChecker();
+               AllocationContext context = 
createBulkCheckerContextWithEv12GroupAndEv3Group(bulkChecker);
+
+               // allocate 2 physical slots for 2 groups
+               List<SlotExecutionVertexAssignment> assignments1 = 
context.allocateSlotsFor(EV1, EV3);
+               fulfilOneOfTwoSlotRequestsAndGetPendingProfile(context, new 
AllocationID());
+               PhysicalSlotRequestBulk bulk1 = bulkChecker.getBulk();
+               List<SlotExecutionVertexAssignment> assignments2 = 
context.allocateSlotsFor(EV2);
+
+               // cancelling of (EV1, EV3) releases assignments1 and only one 
physical slot for EV3
+               // the second physical slot is held by sharing EV2 from the 
next bulk
+               bulk1.cancel(new Throwable());
+
+               // return completed logical slot to clear shared slpt and 
release physical slot

Review comment:
       typo: `slpt` -> `slot`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to