xintongsong commented on a change in pull request #14869:
URL: https://github.com/apache/flink/pull/14869#discussion_r570737663



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/TaskExecutorManager.java
##########
@@ -402,27 +407,31 @@ private void releaseIdleTaskExecutor(InstanceID 
timedOutTaskManagerId) {
     // 
---------------------------------------------------------------------------------------------
 
     public ResourceProfile getTotalRegisteredResources() {
-        return getResourceFromNumSlots(getNumberRegisteredSlots());
+        return taskManagerRegistrations.values().stream()
+                .map(TaskManagerRegistration::getTotalResource)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
     }
 
     public ResourceProfile getTotalRegisteredResourcesOf(InstanceID 
instanceID) {
-        return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+                .map(TaskManagerRegistration::getTotalResource)
+                .orElse(ResourceProfile.ZERO);
     }
 
     public ResourceProfile getTotalFreeResources() {
-        return getResourceFromNumSlots(getNumberFreeSlots());
+        return taskManagerRegistrations.keySet().stream()
+                .map(this::getTotalFreeResourcesOf)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);

Review comment:
       Same here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
##########
@@ -230,30 +230,34 @@ public int getNumberFreeSlotsOf(InstanceID instanceId) {
 
     @Override
     public ResourceProfile getRegisteredResource() {
-        return getResourceFromNumSlots(getNumberRegisteredSlots());
+        return taskManagerRegistrations.values().stream()
+                .map(TaskManagerRegistration::getTotalResource)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);
     }
 
     @Override
     public ResourceProfile getRegisteredResourceOf(InstanceID instanceID) {
-        return getResourceFromNumSlots(getNumberRegisteredSlotsOf(instanceID));
+        return Optional.ofNullable(taskManagerRegistrations.get(instanceID))
+                .map(TaskManagerRegistration::getTotalResource)
+                .orElse(ResourceProfile.ZERO);
     }
 
     @Override
     public ResourceProfile getFreeResource() {
-        return getResourceFromNumSlots(getNumberFreeSlots());
+        return taskManagerRegistrations.keySet().stream()
+                .map(this::getFreeResourceOf)
+                .reduce(ResourceProfile.ZERO, ResourceProfile::merge);

Review comment:
       Better to iterate on the entry set than to iterate on the key set and 
get the entry with the key.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to