xintongsong commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1146049524


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,26 +145,31 @@ private static List<InternalResourceInfo> 
getPendingResources(
                 .collect(Collectors.toList());
     }
 
-    private static int tryFulfilledRequirementWithResource(
+    private int tryFulfilledRequirementWithResource(

Review Comment:
   Maybe we should make the entire `tryFulfilledRequirementWithResource` the 
strategy method.
   
   For the any matching strategy, we want to allocate as many slots as possible 
for each selected TM. And for pending resources, we should always use this 
strategy, so that we use as less pending workers as possible and the rest can 
be canceled.
   
   For the least utilization strategy, we can use a priority queue to reduce 
the overhead of rapidly calling `min()` on the entire worker list.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -237,27 +253,97 @@ && canFulfillRequirement(effectiveProfile, 
remainResource)) {
     private static class InternalResourceInfo {
         private final ResourceProfile defaultSlotProfile;
         private final BiConsumer<JobID, ResourceProfile> allocationConsumer;
+        private final ResourceProfile totalProfile;
         private ResourceProfile availableProfile;
+        private double utilization;
 
         InternalResourceInfo(
                 ResourceProfile defaultSlotProfile,
+                ResourceProfile totalProfile,
                 ResourceProfile availableProfile,
                 BiConsumer<JobID, ResourceProfile> allocationConsumer) {
             this.defaultSlotProfile = defaultSlotProfile;
+            this.totalProfile = totalProfile;
             this.availableProfile = availableProfile;
             this.allocationConsumer = allocationConsumer;
+            this.utilization = 0;

Review Comment:
   There's no guarantee that utilization is initially 0. 



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,26 +145,31 @@ private static List<InternalResourceInfo> 
getPendingResources(
                 .collect(Collectors.toList());
     }
 
-    private static int tryFulfilledRequirementWithResource(
+    private int tryFulfilledRequirementWithResource(
             List<InternalResourceInfo> internalResource,
             int numUnfulfilled,
             ResourceProfile requiredResource,
             JobID jobId) {
-        final Iterator<InternalResourceInfo> internalResourceInfoItr = 
internalResource.iterator();
-        while (numUnfulfilled > 0 && internalResourceInfoItr.hasNext()) {
-            final InternalResourceInfo currentTaskManager = 
internalResourceInfoItr.next();
-            while (numUnfulfilled > 0
-                    && currentTaskManager.tryAllocateSlotForJob(jobId, 
requiredResource)) {
-                numUnfulfilled--;
-            }
-            if 
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
-                internalResourceInfoItr.remove();
+        while (numUnfulfilled > 0) {
+            Optional<InternalResourceInfo> bestTaskManager =
+                    resourceMatchingStrategy.findMatchingResource(
+                            requiredResource, internalResource);
+            if (bestTaskManager.isPresent()) {
+                InternalResourceInfo currentTaskManager = 
bestTaskManager.get();
+                if (currentTaskManager.tryAllocateSlotForJob(jobId, 
requiredResource)) {
+                    numUnfulfilled--;
+                    if 
(currentTaskManager.availableProfile.equals(ResourceProfile.ZERO)) {
+                        internalResource.remove(currentTaskManager);
+                    }

Review Comment:
   Why removing TMs with zero resources? What about TMs with remaining 
resources that is not zero but too small for any slot requirement?
   
   IIUC, we are not relying on this for terminating the loop anyway. The 
terminating condition here is `bestTaskManager.isPresent() == false`.  That 
makes this removing a pruning, which seems to be non-significant (such workers 
will not be selected by `min()` anyway), but hard to understand.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to