zhuzhurk commented on a change in pull request #12256:
URL: https://github.com/apache/flink/pull/12256#discussion_r430231588
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SchedulerImpl.java
##########
@@ -562,4 +575,110 @@ private void releaseSharedSlot(
public boolean requiresPreviousExecutionGraphAllocations() {
return slotSelectionStrategy instanceof
PreviousAllocationSlotSelectionStrategy;
}
+
+ @Override
+ public CompletableFuture<Collection<PhysicalSlotRequest.Result>>
allocatePhysicalSlots(
+ final Collection<PhysicalSlotRequest>
physicalSlotRequests,
+ final Time timeout) {
+
+ final PhysicalSlotRequestBulk slotRequestBulk = new
PhysicalSlotRequestBulk(physicalSlotRequests);
+
+ final List<CompletableFuture<PhysicalSlotRequest.Result>>
resultFutures = new ArrayList<>(physicalSlotRequests.size());
+ for (PhysicalSlotRequest request : physicalSlotRequests) {
+ final CompletableFuture<PhysicalSlotRequest.Result>
resultFuture =
+ allocatePhysicalSlot(request,
timeout).thenApply(result -> {
+ slotRequestBulk.markRequestFulfilled(
+ result.getSlotRequestId(),
+
result.getPhysicalSlot().getAllocationId());
+
+ return result;
+ });
+ resultFutures.add(resultFuture);
+ }
+
+ slotRequestBulkTracker.track(slotRequestBulk);
+ schedulePendingRequestBulkTimeoutCheck(slotRequestBulk,
timeout);
+
+ return FutureUtils.combineAll(resultFutures)
+ .whenComplete((ignore, throwable) ->
slotRequestBulkTracker.untrack(slotRequestBulk));
+ }
+
+ private CompletableFuture<PhysicalSlotRequest.Result>
allocatePhysicalSlot(
+ final PhysicalSlotRequest physicalSlotRequest,
+ final Time timeout) {
+
+ final SlotRequestId slotRequestId =
physicalSlotRequest.getSlotRequestId();
+ final SlotProfile slotProfile =
physicalSlotRequest.getSlotProfile();
+
+ final Optional<SlotAndLocality> availablePhysicalSlot =
tryAllocateFromAvailable(slotRequestId, slotProfile);
Review comment:
I think that the majority part of `SchedulerImpl` will not be needed
anymore in the future.
Introducing a separate interface like `BulkSlotProvider ` might make it
easier for us to drop the deprecated components in the future.
> Do you think we can reuse SchedulerImpl in future?
I think not.
> Would just duplicating tryAllocateFromAvailable/cancelSlotRequest in
BulkSlotProviderImpl bring less confusion in future?
I think yes. Let's do it this way.
> Will we need single slot provider eventually for pipeline region
scheduling at all?
I think not.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]