zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r432004595



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
        public boolean requiresPreviousExecutionGraphAllocations() {
                return slotSelectionStrategy instanceof 
PreviousAllocationSlotSelectionStrategy;
        }
+
+       @Override
+       public CompletableFuture<Collection<PhysicalSlotRequest.Result>> 
allocatePhysicalSlots(
+                       final Collection<PhysicalSlotRequest> 
physicalSlotRequests,
+                       final Time timeout) {
+
+               final PhysicalSlotRequestBulk slotRequestBulk = new 
PhysicalSlotRequestBulk(physicalSlotRequests);
+
+               final List<CompletableFuture<PhysicalSlotRequest.Result>> 
resultFutures = new ArrayList<>(physicalSlotRequests.size());
+               for (PhysicalSlotRequest request : physicalSlotRequests) {
+                       final CompletableFuture<PhysicalSlotRequest.Result> 
resultFuture =
+                               allocatePhysicalSlot(request, 
timeout).thenApply(result -> {
+                                       slotRequestBulk.markRequestFulfilled(
+                                               result.getSlotRequestId(),
+                                               
result.getPhysicalSlot().getAllocationId());
+
+                                       return result;
+                               });
+                       resultFutures.add(resultFuture);
+               }
+
+               slotRequestBulkTracker.track(slotRequestBulk);
+               schedulePendingRequestBulkTimeoutCheck(slotRequestBulk, 
timeout);
+
+               return FutureUtils.combineAll(resultFutures)
+                       .whenComplete((ignore, throwable) -> 
slotRequestBulkTracker.untrack(slotRequestBulk));
+       }
+
+       private CompletableFuture<PhysicalSlotRequest.Result> 
allocatePhysicalSlot(
+                       final PhysicalSlotRequest physicalSlotRequest,
+                       final Time timeout) {
+
+               final SlotRequestId slotRequestId = 
physicalSlotRequest.getSlotRequestId();
+               final SlotProfile slotProfile = 
physicalSlotRequest.getSlotProfile();
+
+               final Optional<SlotAndLocality> availablePhysicalSlot = 
tryAllocateFromAvailable(slotRequestId, slotProfile);

Review comment:
       I just realized that `BulkSlotProvider ` is actually making things more 
complicated because we will need to change how we create it and its factory in 
`JobMaster` and `DefaultJobManagerRunnerFactory`.
   `BulkSlotProvider` needs to know main thread executor and slot pool, which 
makes it hard to create it in SchedulerNG internally.
   Then we will need to keep `BulkSlotProvider` and `Scheduler`/`SlotProvider` 
at the same time in JobMaster before we can completely drop `SchedulerImpl`.
   
   I think it would be easier to just keep the bulk slot allocation logics 
`SchedulerImpl`. We can try to make them less coupled with existing logics that 
will be dropped in the future.
   WDYT? @azagrebin 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to