xiangyuf commented on code in PR #23201:
URL: https://github.com/apache/flink/pull/23201#discussion_r1312731126
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -359,24 +425,45 @@ private void tryFulFillRedundantResources(
.map(internalResourceInfo ->
internalResourceInfo.availableProfile)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);
- tryFulFillRedundantResourcesWithAction(
- requiredRedundantResource,
+ ResourceProfile totalResources =
+ Stream.concat(
+ availableRegisteredResources.stream(),
+ availablePendingResources.stream())
+ .map(internalResourceInfo ->
internalResourceInfo.totalProfile)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+ tryFulFillRequiredResourcesWithAction(
totalAvailableResources,
+ totalResources,
resultBuilder::addPendingTaskManagerAllocate);
}
- private void tryFulFillRedundantResourcesWithAction(
- ResourceProfile requiredRedundantResource,
- ResourceProfile totalAvailableResources,
+ private void tryFulFillRequiredResourcesWithAction(
+ ResourceProfile resourceAvailable,
+ ResourceProfile resourceInTotal,
Consumer<? super PendingTaskManager> fulfillAction) {
- while (!canFulfillRequirement(requiredRedundantResource,
totalAvailableResources)) {
+ while (!isRequiredResourcesFulfilled(resourceAvailable,
resourceInTotal)) {
PendingTaskManager pendingTaskManager =
new PendingTaskManager(totalResourceProfile,
numSlotsPerWorker);
fulfillAction.accept(pendingTaskManager);
- totalAvailableResources =
totalAvailableResources.merge(totalResourceProfile);
+ resourceAvailable = resourceAvailable.merge(totalResourceProfile);
+ resourceInTotal = resourceInTotal.merge(totalResourceProfile);
}
}
+ private ResourceProfile
getTotalResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
+ return taskManagers.stream()
+ .map(TaskManagerInfo::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+ }
+
+ private ResourceProfile getTotalResourceOfPendingTaskManagers(
+ List<PendingTaskManager> pendingTaskManagers) {
+ return pendingTaskManagers.stream()
+ .map(PendingTaskManager::getTotalResourceProfile)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
Review Comment:
@KarmaGYZ thanks for reviewing. IMHO we better not making this assumption
here and the efficiency here is not the top priority.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]