KarmaGYZ commented on code in PR #23201:
URL: https://github.com/apache/flink/pull/23201#discussion_r1311485147
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -359,24 +425,45 @@ private void tryFulFillRedundantResources(
.map(internalResourceInfo ->
internalResourceInfo.availableProfile)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);
- tryFulFillRedundantResourcesWithAction(
- requiredRedundantResource,
+ ResourceProfile totalResources =
+ Stream.concat(
+ availableRegisteredResources.stream(),
+ availablePendingResources.stream())
+ .map(internalResourceInfo ->
internalResourceInfo.totalProfile)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+ tryFulFillRequiredResourcesWithAction(
totalAvailableResources,
+ totalResources,
resultBuilder::addPendingTaskManagerAllocate);
}
- private void tryFulFillRedundantResourcesWithAction(
- ResourceProfile requiredRedundantResource,
- ResourceProfile totalAvailableResources,
+ private void tryFulFillRequiredResourcesWithAction(
+ ResourceProfile resourceAvailable,
+ ResourceProfile resourceInTotal,
Consumer<? super PendingTaskManager> fulfillAction) {
- while (!canFulfillRequirement(requiredRedundantResource,
totalAvailableResources)) {
+ while (!isRequiredResourcesFulfilled(resourceAvailable,
resourceInTotal)) {
PendingTaskManager pendingTaskManager =
new PendingTaskManager(totalResourceProfile,
numSlotsPerWorker);
fulfillAction.accept(pendingTaskManager);
- totalAvailableResources =
totalAvailableResources.merge(totalResourceProfile);
+ resourceAvailable = resourceAvailable.merge(totalResourceProfile);
+ resourceInTotal = resourceInTotal.merge(totalResourceProfile);
}
}
+ private ResourceProfile
getTotalResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
+ return taskManagers.stream()
+ .map(TaskManagerInfo::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
Review Comment:
In this strategy, all the TM resource spec are the same. we can just use it
to multiply the TM num for efficiency.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -359,24 +425,45 @@ private void tryFulFillRedundantResources(
.map(internalResourceInfo ->
internalResourceInfo.availableProfile)
.reduce(ResourceProfile.ZERO, ResourceProfile::merge);
- tryFulFillRedundantResourcesWithAction(
- requiredRedundantResource,
+ ResourceProfile totalResources =
+ Stream.concat(
+ availableRegisteredResources.stream(),
+ availablePendingResources.stream())
+ .map(internalResourceInfo ->
internalResourceInfo.totalProfile)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+
+ tryFulFillRequiredResourcesWithAction(
totalAvailableResources,
+ totalResources,
resultBuilder::addPendingTaskManagerAllocate);
}
- private void tryFulFillRedundantResourcesWithAction(
- ResourceProfile requiredRedundantResource,
- ResourceProfile totalAvailableResources,
+ private void tryFulFillRequiredResourcesWithAction(
+ ResourceProfile resourceAvailable,
+ ResourceProfile resourceInTotal,
Consumer<? super PendingTaskManager> fulfillAction) {
- while (!canFulfillRequirement(requiredRedundantResource,
totalAvailableResources)) {
+ while (!isRequiredResourcesFulfilled(resourceAvailable,
resourceInTotal)) {
PendingTaskManager pendingTaskManager =
new PendingTaskManager(totalResourceProfile,
numSlotsPerWorker);
fulfillAction.accept(pendingTaskManager);
- totalAvailableResources =
totalAvailableResources.merge(totalResourceProfile);
+ resourceAvailable = resourceAvailable.merge(totalResourceProfile);
+ resourceInTotal = resourceInTotal.merge(totalResourceProfile);
}
}
+ private ResourceProfile
getTotalResourceOfTaskManagers(List<TaskManagerInfo> taskManagers) {
+ return taskManagers.stream()
+ .map(TaskManagerInfo::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
+ }
+
+ private ResourceProfile getTotalResourceOfPendingTaskManagers(
+ List<PendingTaskManager> pendingTaskManagers) {
+ return pendingTaskManagers.stream()
+ .map(PendingTaskManager::getTotalResourceProfile)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
Review Comment:
In this strategy, all the TM resource spec are the same. we can just use it
to multiply the TM num for efficiency.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]