xintongsong commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1146049524
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,26 +145,31 @@ private static List<InternalResourceInfo>
getPendingResources(
.collect(Collectors.toList());
}
- private static int tryFulfilledRequirementWithResource(
+ private int tryFulfilledRequirementWithResource(
Review Comment:
Maybe we should make the entire `tryFulfilledRequirementWithResource` the
strategy method.
For the any matching strategy, we want to allocate as many slots as possible
for each selected TM. And for pending resources, we should always use this
strategy, so that we use as less pending workers as possible and the rest can
be canceled.
For the least utilization strategy, we can use a priority queue to reduce
the overhead of rapidly calling `min()` on the entire worker list.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -237,27 +253,97 @@ && canFulfillRequirement(effectiveProfile,
remainResource)) {
private static class InternalResourceInfo {
private final ResourceProfile defaultSlotProfile;
private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
+ private final ResourceProfile totalProfile;
private ResourceProfile availableProfile;
+ private double utilization;
InternalResourceInfo(
ResourceProfile defaultSlotProfile,
+ ResourceProfile totalProfile,
ResourceProfile availableProfile,
BiConsumer<JobID, ResourceProfile> allocationConsumer) {
this.defaultSlotProfile = defaultSlotProfile;
+ this.totalProfile = totalProfile;
this.availableProfile = availableProfile;
this.allocationConsumer = allocationConsumer;
+ this.utilization = 0;
Review Comment:
There's no guarantee that utilization is initially 0.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,26 +145,31 @@ private static List<InternalResourceInfo>
getPendingResources(
.collect(Collectors.toList());
}
- private static int tryFulfilledRequirementWithResource(
+ private int tryFulfilledRequirementWithResource(
List<InternalResourceInfo> internalResource,
int numUnfulfilled,
ResourceProfile requiredResource,
JobID jobId) {
- final Iterator<InternalResourceInfo> internalResourceInfoItr =
internalResource.iterator();
- while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
- final InternalResourceInfo currentTaskManager =
internalResourceInfoItr.next();
- while (numUnfulfilled > 0
- && currentTaskManager.tryAllocateSlotForJob(jobId,
requiredResource)) {
- numUnfulfilled--;
- }
- if
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
- internalResourceInfoItr.remove();
+ while (numUnfulfilled > 0) {
+ Optional<InternalResourceInfo> bestTaskManager =
+ resourceMatchingStrategy.findMatchingResource(
+ requiredResource, internalResource);
+ if (bestTaskManager.isPresent()) {
+ InternalResourceInfo currentTaskManager =
bestTaskManager.get();
+ if (currentTaskManager.tryAllocateSlotForJob(jobId,
requiredResource)) {
+ numUnfulfilled--;
+ if
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+ internalResource.remove(currentTaskManager);
+ }
Review Comment:
Why removing TMs with zero resources? What about TMs with remaining
resources that is not zero but too small for any slot requirement?
IIUC, we are not relying on this for terminating the loop anyway. The
terminating condition here is `bestTaskManager.isPresent() == false`. That
makes this removing a pruning, which seems to be non-significant (such workers
will not be selected by `min()` anyway), but hard to understand.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]