wanglijie95 commented on code in PR #22424:
URL: https://github.com/apache/flink/pull/22424#discussion_r1174715430


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java:
##########
@@ -218,11 +205,59 @@ public Collection<FreeSlotInfo> getFreeSlotsInformation() 
{
         return freeSlotInfos;
     }
 
+    public double getTaskExecutorUtilization(ResourceID resourceId) {
+        Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
+        Preconditions.checkNotNull(slots, "There is no slots on %s", 
resourceId);
+
+        return (double) (slots.size() - 
freeSlots.getFreeSlotsNumberOfTaskExecutor(resourceId))
+                / slots.size();
+    }
+
     @Override
     public Collection<? extends SlotInfo> getAllSlotsInformation() {
         return registeredSlots.values();
     }
 
+    private static final class FreeSlots {
+        /** Map containing all free slots and since when they are free. */
+        private final Map<AllocationID, Long> freeSlotsSince = new HashMap<>();
+
+        /** Index containing a mapping between TaskExecutors and their free 
slots number. */
+        private final Map<ResourceID, Integer> freeSlotsNumberPerTaskExecutor 
= new HashMap<>();
+
+        public void addFreeSlot(AllocatedSlot slot, long currentTime) {

Review Comment:
   I prefer to change this method to `addFreeSlot(SlotInfo, long)` or 
`addFreeSlot(AllocationID, ResourceID, long)`, `AllocatedSlot` is a bit 
redundant. `removeFreeSlot` is also the same.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java:
##########
@@ -218,11 +205,59 @@ public Collection<FreeSlotInfo> getFreeSlotsInformation() 
{
         return freeSlotInfos;
     }
 
+    public double getTaskExecutorUtilization(ResourceID resourceId) {
+        Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
+        Preconditions.checkNotNull(slots, "There is no slots on %s", 
resourceId);
+
+        return (double) (slots.size() - 
freeSlots.getFreeSlotsNumberOfTaskExecutor(resourceId))
+                / slots.size();
+    }
+
     @Override
     public Collection<? extends SlotInfo> getAllSlotsInformation() {
         return registeredSlots.values();
     }
 
+    private static final class FreeSlots {
+        /** Map containing all free slots and since when they are free. */
+        private final Map<AllocationID, Long> freeSlotsSince = new HashMap<>();
+
+        /** Index containing a mapping between TaskExecutors and their free 
slots number. */
+        private final Map<ResourceID, Integer> freeSlotsNumberPerTaskExecutor 
= new HashMap<>();
+
+        public void addFreeSlot(AllocatedSlot slot, long currentTime) {
+            Preconditions.checkState(
+                    !freeSlotsSince.containsKey(slot.getAllocationId()),
+                    "Slot with id %s has been freed",
+                    slot.getAllocationId());
+            freeSlotsSince.put(slot.getAllocationId(), currentTime);
+            freeSlotsNumberPerTaskExecutor.put(
+                    slot.getTaskManagerId(),
+                    
freeSlotsNumberPerTaskExecutor.getOrDefault(slot.getTaskManagerId(), 0) + 1);
+        }
+
+        public Long removeFreeSlot(AllocatedSlot slot) {

Review Comment:
   I suggest to take advantage of the return value of the `remove` method , and 
use `compute` mothod here, a suggestion:
   ```
   Long freeSince = freeSlotsSince.remove(slot.getAllocationId());
   if (freeSince != null) {
       freeSlotsNumberPerTaskExecutor.computeIfPresent(
               slot.getTaskManagerId(),
               (ignore, count) -> {
                   int newCount = count - 1;
                   return newCount == 0 ? null : newCount;
               });
   }
   
   return freeSince;
   ```
   Please correct me if I'm wrong :)



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java:
##########
@@ -218,11 +205,59 @@ public Collection<FreeSlotInfo> getFreeSlotsInformation() 
{
         return freeSlotInfos;
     }
 
+    public double getTaskExecutorUtilization(ResourceID resourceId) {
+        Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
+        Preconditions.checkNotNull(slots, "There is no slots on %s", 
resourceId);
+
+        return (double) (slots.size() - 
freeSlots.getFreeSlotsNumberOfTaskExecutor(resourceId))
+                / slots.size();
+    }
+
     @Override
     public Collection<? extends SlotInfo> getAllSlotsInformation() {
         return registeredSlots.values();
     }
 
+    private static final class FreeSlots {
+        /** Map containing all free slots and since when they are free. */
+        private final Map<AllocationID, Long> freeSlotsSince = new HashMap<>();
+
+        /** Index containing a mapping between TaskExecutors and their free 
slots number. */
+        private final Map<ResourceID, Integer> freeSlotsNumberPerTaskExecutor 
= new HashMap<>();
+
+        public void addFreeSlot(AllocatedSlot slot, long currentTime) {

Review Comment:
   Although the current implementation may not pass in an already freeed slot, 
I suggest keeping the same logic as before. A suggestion:
   ```
   if (freeSlotsSince.put(slot.getAllocationId(), currentTime) == null) {
       freeSlotsNumberPerTaskExecutor.merge(slot.getTaskManagerId(), 1, 
Integer::sum);
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to