zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r432004595
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
public boolean requiresPreviousExecutionGraphAllocations() {
return slotSelectionStrategy instanceof
PreviousAllocationSlotSelectionStrategy;
}
+
+ @Override
+ public CompletableFuture<Collection<PhysicalSlotRequest.Result>>
allocatePhysicalSlots(
+ final Collection<PhysicalSlotRequest>
physicalSlotRequests,
+ final Time timeout) {
+
+ final PhysicalSlotRequestBulk slotRequestBulk = new
PhysicalSlotRequestBulk(physicalSlotRequests);
+
+ final List<CompletableFuture<PhysicalSlotRequest.Result>>
resultFutures = new ArrayList<>(physicalSlotRequests.size());
+ for (PhysicalSlotRequest request : physicalSlotRequests) {
+ final CompletableFuture<PhysicalSlotRequest.Result>
resultFuture =
+ allocatePhysicalSlot(request,
timeout).thenApply(result -> {
+ slotRequestBulk.markRequestFulfilled(
+ result.getSlotRequestId(),
+
result.getPhysicalSlot().getAllocationId());
+
+ return result;
+ });
+ resultFutures.add(resultFuture);
+ }
+
+ slotRequestBulkTracker.track(slotRequestBulk);
+ schedulePendingRequestBulkTimeoutCheck(slotRequestBulk,
timeout);
+
+ return FutureUtils.combineAll(resultFutures)
+ .whenComplete((ignore, throwable) ->
slotRequestBulkTracker.untrack(slotRequestBulk));
+ }
+
+ private CompletableFuture<PhysicalSlotRequest.Result>
allocatePhysicalSlot(
+ final PhysicalSlotRequest physicalSlotRequest,
+ final Time timeout) {
+
+ final SlotRequestId slotRequestId =
physicalSlotRequest.getSlotRequestId();
+ final SlotProfile slotProfile =
physicalSlotRequest.getSlotProfile();
+
+ final Optional<SlotAndLocality> availablePhysicalSlot =
tryAllocateFromAvailable(slotRequestId, slotProfile);
Review comment:
I just realized that `BulkSlotProvider ` is actually making things more
complicated because we will need to change how we create it and its factory in
`JobMaster` and `DefaultJobManagerRunnerFactory`.
`BulkSlotProvider` needs to know main thread executor and slot pool, which
makes it hard to create it in SchedulerNG internally.
Then we will need to keep `BulkSlotProvider` and `Scheduler`/`SlotProvider`
at the same time in JobMaster before we can completely drop `SchedulerImpl`.
I think it would be easier and cleaner to just keep the bulk slot allocation
logics in `SchedulerImpl`. We can try to make them less coupled with existing
logics that will be dropped in the future.
WDYT? @azagrebin
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]