xintongsong commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1145655305


##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotMatchingStrategy.java:
##########
@@ -41,4 +42,18 @@ <T extends TaskManagerSlotInformation> Optional<T> 
findMatchingSlot(
             ResourceProfile requestedProfile,
             Collection<T> freeSlots,
             Function<InstanceID, Integer> numberRegisteredSlotsLookup);
+
+    /**
+     * Finds a matching resource instance for the request {@link 
ResourceProfile} given the
+     * collection of available instances.
+     *
+     * @param resourceMatchingPredicate Predicate of whether instance resource 
is matching
+     * @param availableInstances collection of available instances
+     * @param instanceResourceInfoLookup lookup for the current resource info 
of instance
+     * @return Returns a matching instance or {@link Optional#empty()} if 
there is none
+     */
+    Optional<InstanceID> findMatchingResource(
+            Predicate<InstanceID> resourceMatchingPredicate,
+            Collection<InstanceID> availableInstances,
+            Function<InstanceID, TaskManagerResourceInfoSnapshot> 
instanceResourceInfoLookup);

Review Comment:
   I'd suggest to move this to `DefaultResourceAllocationStrategy` as an 
internal interface.
   - The new method does not match the description of `SlotMatchingStrategy`, 
which is "Strategy how to find a matching slot". Even if we change the name, 
the implementation of the new method is completely independent from the 
existing ones, which is usually an indicator that the interface has two 
responsibilities that can be separated.
   - A side benefit is that, by making this matching strategy and its 
implementations internal to `DefaultResourceAllocationStrategy`, we can simply 
maintain the resource utilization in `InternalResourceInfo`, which prevents 
lots of redundant calculations and introducing of 
`TaskManagerResourceInfoSnapshot`.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java:
##########
@@ -79,4 +94,21 @@ private static double calculateUtilization(
 
         return (double) (numberRegisteredSlots - numberFreeSlots) / 
numberRegisteredSlots;
     }
+
+    private static double calculateUtilization(
+            TaskManagerResourceInfoSnapshot instanceResourceInfoSnapshot) {
+        if (instanceResourceInfoSnapshot.getTotalProfile() == 
ResourceProfile.UNKNOWN
+                || instanceResourceInfoSnapshot.getAvailableProfile() == 
ResourceProfile.UNKNOWN) {
+            return instanceResourceInfoSnapshot.getAllocatedSlotNumber();
+        } else {
+            // Since CPU and memory are proportional in most scenarios,
+            // use cpu usage to represent utilization to simplify the logic in 
the first version
+            return instanceResourceInfoSnapshot
+                    .getTotalProfile()
+                    .getCpuCores()
+                    
.subtract(instanceResourceInfoSnapshot.getAvailableProfile().getCpuCores())
+                    .getValue()
+                    .doubleValue();
+        }
+    }

Review Comment:
   The utilization can be maintained in `InternalResourceInfo` and 
re-calculated only upon allocation changes.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final ResourceAllocationResult.Builder resultBuilder = 
ResourceAllocationResult.builder();
 
-        final List<InternalResourceInfo> registeredResources =
-                getAvailableResources(
-                        taskManagerResourceInfoProvider, resultBuilder, 
blockedTaskManagerChecker);
+        final List<TaskManagerInfo> registeredResources =
+                getAvailableResources(taskManagerResourceInfoProvider, 
blockedTaskManagerChecker);
+        final List<InstanceID> registeredResourceIdInOrder =
+                registeredResources.stream()
+                        .map(TaskManagerInfo::getInstanceId)
+                        .collect(Collectors.toList());
+        final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+                registeredResources.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        TaskManagerInfo::getInstanceId,
+                                        t ->
+                                                new InternalResourceInfo(
+                                                        
t.getDefaultSlotResourceProfile(),
+                                                        t.getTotalResource(),
+                                                        
t.getAvailableResource(),
+                                                        (jobID, slotProfile) ->
+                                                                resultBuilder
+                                                                        
.addAllocationOnRegisteredResource(
+                                                                               
 jobID,
+                                                                               
 t.getInstanceId(),
+                                                                               
 slotProfile))));

Review Comment:
   I'm not entirely sure why this is moved out from `getAvailableResources`. 
IIUC, this is because we need both a list of `InstanceID` and a map from 
`InstanceID` to `InternalResourceInfo` for 
`tryFulfillRequirementsForJobWithResources`. But isn't the list the same as the 
keyset of the map?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java:
##########
@@ -79,4 +94,21 @@ private static double calculateUtilization(
 
         return (double) (numberRegisteredSlots - numberFreeSlots) / 
numberRegisteredSlots;
     }
+
+    private static double calculateUtilization(
+            TaskManagerResourceInfoSnapshot instanceResourceInfoSnapshot) {
+        if (instanceResourceInfoSnapshot.getTotalProfile() == 
ResourceProfile.UNKNOWN
+                || instanceResourceInfoSnapshot.getAvailableProfile() == 
ResourceProfile.UNKNOWN) {
+            return instanceResourceInfoSnapshot.getAllocatedSlotNumber();
+        } else {
+            // Since CPU and memory are proportional in most scenarios,
+            // use cpu usage to represent utilization to simplify the logic in 
the first version
+            return instanceResourceInfoSnapshot
+                    .getTotalProfile()
+                    .getCpuCores()
+                    
.subtract(instanceResourceInfoSnapshot.getAvailableProfile().getCpuCores())
+                    .getValue()
+                    .doubleValue();
+        }
+    }

Review Comment:
   Moreover, I'm not sure about assuming cpu & memory are proportional. In 
fact, it's one of the major reason why people need fine-grained resource 
management that they are not always proportional.
   
   A simple way of calculating the utilization while considering both cpu & 
memory, is to calculate the utilization of cpu and memory separately and use 
whichever is larger.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,7 +154,45 @@ private static List<InternalResourceInfo> 
getPendingResources(
                 .collect(Collectors.toList());
     }
 
-    private static int tryFulfilledRequirementWithResource(
+    private int tryFulfilledRequirementWithAvailableResources(

Review Comment:
   I think there's not much differences between fulfilling requirements from 
existing available resources and pending resources. We only need an argument to 
tell which strategy should be used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final ResourceAllocationResult.Builder resultBuilder = 
ResourceAllocationResult.builder();
 
-        final List<InternalResourceInfo> registeredResources =
-                getAvailableResources(
-                        taskManagerResourceInfoProvider, resultBuilder, 
blockedTaskManagerChecker);
+        final List<TaskManagerInfo> registeredResources =
+                getAvailableResources(taskManagerResourceInfoProvider, 
blockedTaskManagerChecker);
+        final List<InstanceID> registeredResourceIdInOrder =
+                registeredResources.stream()
+                        .map(TaskManagerInfo::getInstanceId)
+                        .collect(Collectors.toList());

Review Comment:
   The variable is named `registeredResourceIdInOrder`. However, I don't see 
what is the order.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
             BlockedTaskManagerChecker blockedTaskManagerChecker) {
         final ResourceAllocationResult.Builder resultBuilder = 
ResourceAllocationResult.builder();
 
-        final List<InternalResourceInfo> registeredResources =
-                getAvailableResources(
-                        taskManagerResourceInfoProvider, resultBuilder, 
blockedTaskManagerChecker);
+        final List<TaskManagerInfo> registeredResources =
+                getAvailableResources(taskManagerResourceInfoProvider, 
blockedTaskManagerChecker);
+        final List<InstanceID> registeredResourceIdInOrder =
+                registeredResources.stream()
+                        .map(TaskManagerInfo::getInstanceId)
+                        .collect(Collectors.toList());
+        final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+                registeredResources.stream()
+                        .collect(
+                                Collectors.toMap(
+                                        TaskManagerInfo::getInstanceId,
+                                        t ->
+                                                new InternalResourceInfo(
+                                                        
t.getDefaultSlotResourceProfile(),
+                                                        t.getTotalResource(),
+                                                        
t.getAvailableResource(),
+                                                        (jobID, slotProfile) ->
+                                                                resultBuilder
+                                                                        
.addAllocationOnRegisteredResource(
+                                                                               
 jobID,
+                                                                               
 t.getInstanceId(),
+                                                                               
 slotProfile))));

Review Comment:
   And if we work on `InternalResourceInfo` directly for selecting the less 
utilization resource, this complexity can be avoided.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to