xintongsong commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1145655305
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotMatchingStrategy.java:
##########
@@ -41,4 +42,18 @@ <T extends TaskManagerSlotInformation> Optional<T>
findMatchingSlot(
ResourceProfile requestedProfile,
Collection<T> freeSlots,
Function<InstanceID, Integer> numberRegisteredSlotsLookup);
+
+ /**
+ * Finds a matching resource instance for the request {@link
ResourceProfile} given the
+ * collection of available instances.
+ *
+ * @param resourceMatchingPredicate Predicate of whether instance resource
is matching
+ * @param availableInstances collection of available instances
+ * @param instanceResourceInfoLookup lookup for the current resource info
of instance
+ * @return Returns a matching instance or {@link Optional#empty()} if
there is none
+ */
+ Optional<InstanceID> findMatchingResource(
+ Predicate<InstanceID> resourceMatchingPredicate,
+ Collection<InstanceID> availableInstances,
+ Function<InstanceID, TaskManagerResourceInfoSnapshot>
instanceResourceInfoLookup);
Review Comment:
I'd suggest to move this to `DefaultResourceAllocationStrategy` as an
internal interface.
- The new method does not match the description of `SlotMatchingStrategy`,
which is "Strategy how to find a matching slot". Even if we change the name,
the implementation of the new method is completely independent from the
existing ones, which is usually an indicator that the interface has two
responsibilities that can be separated.
- A side benefit is that, by making this matching strategy and its
implementations internal to `DefaultResourceAllocationStrategy`, we can simply
maintain the resource utilization in `InternalResourceInfo`, which prevents
lots of redundant calculations and introducing of
`TaskManagerResourceInfoSnapshot`.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java:
##########
@@ -79,4 +94,21 @@ private static double calculateUtilization(
return (double) (numberRegisteredSlots - numberFreeSlots) /
numberRegisteredSlots;
}
+
+ private static double calculateUtilization(
+ TaskManagerResourceInfoSnapshot instanceResourceInfoSnapshot) {
+ if (instanceResourceInfoSnapshot.getTotalProfile() ==
ResourceProfile.UNKNOWN
+ || instanceResourceInfoSnapshot.getAvailableProfile() ==
ResourceProfile.UNKNOWN) {
+ return instanceResourceInfoSnapshot.getAllocatedSlotNumber();
+ } else {
+ // Since CPU and memory are proportional in most scenarios,
+ // use cpu usage to represent utilization to simplify the logic in
the first version
+ return instanceResourceInfoSnapshot
+ .getTotalProfile()
+ .getCpuCores()
+
.subtract(instanceResourceInfoSnapshot.getAvailableProfile().getCpuCores())
+ .getValue()
+ .doubleValue();
+ }
+ }
Review Comment:
The utilization can be maintained in `InternalResourceInfo` and
re-calculated only upon allocation changes.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
BlockedTaskManagerChecker blockedTaskManagerChecker) {
final ResourceAllocationResult.Builder resultBuilder =
ResourceAllocationResult.builder();
- final List<InternalResourceInfo> registeredResources =
- getAvailableResources(
- taskManagerResourceInfoProvider, resultBuilder,
blockedTaskManagerChecker);
+ final List<TaskManagerInfo> registeredResources =
+ getAvailableResources(taskManagerResourceInfoProvider,
blockedTaskManagerChecker);
+ final List<InstanceID> registeredResourceIdInOrder =
+ registeredResources.stream()
+ .map(TaskManagerInfo::getInstanceId)
+ .collect(Collectors.toList());
+ final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+ registeredResources.stream()
+ .collect(
+ Collectors.toMap(
+ TaskManagerInfo::getInstanceId,
+ t ->
+ new InternalResourceInfo(
+
t.getDefaultSlotResourceProfile(),
+ t.getTotalResource(),
+
t.getAvailableResource(),
+ (jobID, slotProfile) ->
+ resultBuilder
+
.addAllocationOnRegisteredResource(
+
jobID,
+
t.getInstanceId(),
+
slotProfile))));
Review Comment:
I'm not entirely sure why this is moved out from `getAvailableResources`.
IIUC, this is because we need both a list of `InstanceID` and a map from
`InstanceID` to `InternalResourceInfo` for
`tryFulfillRequirementsForJobWithResources`. But isn't the list the same as the
keyset of the map?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/LeastUtilizationSlotMatchingStrategy.java:
##########
@@ -79,4 +94,21 @@ private static double calculateUtilization(
return (double) (numberRegisteredSlots - numberFreeSlots) /
numberRegisteredSlots;
}
+
+ private static double calculateUtilization(
+ TaskManagerResourceInfoSnapshot instanceResourceInfoSnapshot) {
+ if (instanceResourceInfoSnapshot.getTotalProfile() ==
ResourceProfile.UNKNOWN
+ || instanceResourceInfoSnapshot.getAvailableProfile() ==
ResourceProfile.UNKNOWN) {
+ return instanceResourceInfoSnapshot.getAllocatedSlotNumber();
+ } else {
+ // Since CPU and memory are proportional in most scenarios,
+ // use cpu usage to represent utilization to simplify the logic in
the first version
+ return instanceResourceInfoSnapshot
+ .getTotalProfile()
+ .getCpuCores()
+
.subtract(instanceResourceInfoSnapshot.getAvailableProfile().getCpuCores())
+ .getValue()
+ .doubleValue();
+ }
+ }
Review Comment:
Moreover, I'm not sure about assuming cpu & memory are proportional. In
fact, it's one of the major reason why people need fine-grained resource
management that they are not always proportional.
A simple way of calculating the utilization while considering both cpu &
memory, is to calculate the utilization of cpu and memory separately and use
whichever is larger.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -135,7 +154,45 @@ private static List<InternalResourceInfo>
getPendingResources(
.collect(Collectors.toList());
}
- private static int tryFulfilledRequirementWithResource(
+ private int tryFulfilledRequirementWithAvailableResources(
Review Comment:
I think there's not much differences between fulfilling requirements from
existing available resources and pending resources. We only need an argument to
tell which strategy should be used.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
BlockedTaskManagerChecker blockedTaskManagerChecker) {
final ResourceAllocationResult.Builder resultBuilder =
ResourceAllocationResult.builder();
- final List<InternalResourceInfo> registeredResources =
- getAvailableResources(
- taskManagerResourceInfoProvider, resultBuilder,
blockedTaskManagerChecker);
+ final List<TaskManagerInfo> registeredResources =
+ getAvailableResources(taskManagerResourceInfoProvider,
blockedTaskManagerChecker);
+ final List<InstanceID> registeredResourceIdInOrder =
+ registeredResources.stream()
+ .map(TaskManagerInfo::getInstanceId)
+ .collect(Collectors.toList());
Review Comment:
The variable is named `registeredResourceIdInOrder`. However, I don't see
what is the order.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
BlockedTaskManagerChecker blockedTaskManagerChecker) {
final ResourceAllocationResult.Builder resultBuilder =
ResourceAllocationResult.builder();
- final List<InternalResourceInfo> registeredResources =
- getAvailableResources(
- taskManagerResourceInfoProvider, resultBuilder,
blockedTaskManagerChecker);
+ final List<TaskManagerInfo> registeredResources =
+ getAvailableResources(taskManagerResourceInfoProvider,
blockedTaskManagerChecker);
+ final List<InstanceID> registeredResourceIdInOrder =
+ registeredResources.stream()
+ .map(TaskManagerInfo::getInstanceId)
+ .collect(Collectors.toList());
+ final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+ registeredResources.stream()
+ .collect(
+ Collectors.toMap(
+ TaskManagerInfo::getInstanceId,
+ t ->
+ new InternalResourceInfo(
+
t.getDefaultSlotResourceProfile(),
+ t.getTotalResource(),
+
t.getAvailableResource(),
+ (jobID, slotProfile) ->
+ resultBuilder
+
.addAllocationOnRegisteredResource(
+
jobID,
+
t.getInstanceId(),
+
slotProfile))));
Review Comment:
And if we work on `InternalResourceInfo` directly for selecting the less
utilization resource, this complexity can be avoided.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]