huwh commented on code in PR #22176:
URL: https://github.com/apache/flink/pull/22176#discussion_r1145769960
##########
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##########
@@ -73,9 +80,28 @@ public ResourceAllocationResult tryFulfillRequirements(
BlockedTaskManagerChecker blockedTaskManagerChecker) {
final ResourceAllocationResult.Builder resultBuilder =
ResourceAllocationResult.builder();
- final List<InternalResourceInfo> registeredResources =
- getAvailableResources(
- taskManagerResourceInfoProvider, resultBuilder,
blockedTaskManagerChecker);
+ final List<TaskManagerInfo> registeredResources =
+ getAvailableResources(taskManagerResourceInfoProvider,
blockedTaskManagerChecker);
+ final List<InstanceID> registeredResourceIdInOrder =
+ registeredResources.stream()
+ .map(TaskManagerInfo::getInstanceId)
+ .collect(Collectors.toList());
+ final Map<InstanceID, InternalResourceInfo> registeredResourcesInfo =
+ registeredResources.stream()
+ .collect(
+ Collectors.toMap(
+ TaskManagerInfo::getInstanceId,
+ t ->
+ new InternalResourceInfo(
+
t.getDefaultSlotResourceProfile(),
+ t.getTotalResource(),
+
t.getAvailableResource(),
+ (jobID, slotProfile) ->
+ resultBuilder
+
.addAllocationOnRegisteredResource(
+
jobID,
+
t.getInstanceId(),
+
slotProfile))));
Review Comment:
move this out to get the instance list and InternalResourceInfo map to
compare resources between instance.
I will see whether could ignore the instance id if we move the
slotMatchingStrategy to this as an internal interface.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]