zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r432239813
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
public boolean requiresPreviousExecutionGraphAllocations() {
return slotSelectionStrategy instanceof
PreviousAllocationSlotSelectionStrategy;
}
+
+ @Override
+ public CompletableFuture<Collection<PhysicalSlotRequest.Result>>
allocatePhysicalSlots(
+ final Collection<PhysicalSlotRequest>
physicalSlotRequests,
+ final Time timeout) {
+
+ final PhysicalSlotRequestBulk slotRequestBulk = new
PhysicalSlotRequestBulk(physicalSlotRequests);
+
+ final List<CompletableFuture<PhysicalSlotRequest.Result>>
resultFutures = new ArrayList<>(physicalSlotRequests.size());
+ for (PhysicalSlotRequest request : physicalSlotRequests) {
+ final CompletableFuture<PhysicalSlotRequest.Result>
resultFuture =
+ allocatePhysicalSlot(request,
timeout).thenApply(result -> {
+ slotRequestBulk.markRequestFulfilled(
+ result.getSlotRequestId(),
+
result.getPhysicalSlot().getAllocationId());
+
+ return result;
+ });
+ resultFutures.add(resultFuture);
+ }
+
+ slotRequestBulkTracker.track(slotRequestBulk);
+ schedulePendingRequestBulkTimeoutCheck(slotRequestBulk,
timeout);
+
+ return FutureUtils.combineAll(resultFutures)
+ .whenComplete((ignore, throwable) ->
slotRequestBulkTracker.untrack(slotRequestBulk));
+ }
+
+ private CompletableFuture<PhysicalSlotRequest.Result>
allocatePhysicalSlot(
+ final PhysicalSlotRequest physicalSlotRequest,
+ final Time timeout) {
+
+ final SlotRequestId slotRequestId =
physicalSlotRequest.getSlotRequestId();
+ final SlotProfile slotProfile =
physicalSlotRequest.getSlotProfile();
+
+ final Optional<SlotAndLocality> availablePhysicalSlot =
tryAllocateFromAvailable(slotRequestId, slotProfile);
Review comment:
I find one way that we can still have a separate `BulkSlotProvider` and
do not need to change `Scheduler`/`SlotProvider ` creating and initializing
codes.
That is, we still enables `SlotProvider` for bulk slot allocation. However,
`SchedulerImpl` creates a `BulkSlotProviderImpl` internally and dedicates bulk
slot allocation requests to it.
See 395c6cba8c463cffe2cf3cc70d91e6c6ff0dbb9f
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]