xintongsong commented on a change in pull request #14869:
URL: https://github.com/apache/flink/pull/14869#discussion_r570737663
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
##########
@@ -402,27 +407,31 @@ private void releaseIdleTaskExecutor(InstanceID
timedOutTaskManagerId) {
//
---------------------------------------------------------------------------------------------
public ResourceProfile getTotalRegisteredResources() {
- return getResourceFromNumSlots(getNumberRegisteredSlots());
+ return taskManagerRegistrations.values().stream()
+ .map(TaskManagerRegistration::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}
public ResourceProfile getTotalRegisteredResourcesOf(InstanceID
instanceID) {
- return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+ return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+ .map(TaskManagerRegistration::getTotalResource)
+ .orElse(ResourceProfile.ZERO);
}
public ResourceProfile getTotalFreeResources() {
- return getResourceFromNumSlots(getNumberFreeSlots());
+ return taskManagerRegistrations.keySet().stream()
+ .map(this::getTotalFreeResourcesOf)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
Review comment:
Same here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -230,30 +230,34 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
@Override
public ResourceProfile getRegisteredResource() {
- return getResourceFromNumSlots(getNumberRegisteredSlots());
+ return taskManagerRegistrations.values().stream()
+ .map(TaskManagerRegistration::getTotalResource)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
}
@Override
public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
- return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+ return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+ .map(TaskManagerRegistration::getTotalResource)
+ .orElse(ResourceProfile.ZERO);
}
@Override
public ResourceProfile getFreeResource() {
- return getResourceFromNumSlots(getNumberFreeSlots());
+ return taskManagerRegistrations.keySet().stream()
+ .map(this::getFreeResourceOf)
+ .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
Review comment:
Better to iterate on the entry set than to iterate on the key set and
get the entry with the key.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]