reswqa commented on code in PR #22424:
URL: https://github.com/apache/flink/pull/22424#discussion_r1170857542


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultAllocatedSlotPool.java:
##########
@@ -218,11 +205,66 @@ public Collection<FreeSlotInfo> getFreeSlotsInformation() 
{
         return freeSlotInfos;
     }
 
+    public double getTaskExecutorUtilization(ResourceID resourceId) {
+        Set<AllocationID> slots = slotsPerTaskExecutor.get(resourceId);
+        Preconditions.checkNotNull(slots, "There is no slots on %s", 
resourceId);
+
+        return (double) (slots.size() - 
freeSlots.getSlotsOfTaskExecutor(resourceId).size())
+                / slots.size();
+    }
+
     @Override
     public Collection<? extends SlotInfo> getAllSlotsInformation() {
         return registeredSlots.values();
     }
 
+    private static final class FreeSlots {
+        /** Map containing all free slots and since when they are free. */
+        private final Map<AllocationID, Long> freeSlotsSince;
+
+        /** Index containing a mapping between TaskExecutors and their free 
slots. */
+        private final Map<ResourceID, Set<AllocationID>> 
freeSlotsPerTaskExecutor;

Review Comment:
   Can we initialize these two fields here and remove the constructor.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java:
##########
@@ -65,7 +80,13 @@ public boolean willBeOccupiedIndefinitely() {
         return slotInfoDelegate.willBeOccupiedIndefinitely();
     }
 
+    @VisibleForTesting
     public static SlotInfoWithUtilization from(SlotInfo slotInfo, double 
taskExecutorUtilization) {
         return new SlotInfoWithUtilization(slotInfo, taskExecutorUtilization);
     }

Review Comment:
   I'd prefer remove this method as its only used in test.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java:
##########
@@ -65,7 +80,13 @@ public boolean willBeOccupiedIndefinitely() {
         return slotInfoDelegate.willBeOccupiedIndefinitely();
     }
 
+    @VisibleForTesting
     public static SlotInfoWithUtilization from(SlotInfo slotInfo, double 
taskExecutorUtilization) {
         return new SlotInfoWithUtilization(slotInfo, taskExecutorUtilization);
     }
+
+    public static SlotInfoWithUtilization from(
+            SlotInfo slotInfo, Function<ResourceID, Double> 
taskExecutorUtilizationFunc) {

Review Comment:
   ```suggestion
               SlotInfo slotInfo, Function<ResourceID, Double> 
taskExecutorUtilizationLookup) {
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/LocationPreferenceSlotSelectionStrategy.java:
##########
@@ -122,7 +121,7 @@ protected abstract Optional<SlotInfoAndLocality> 
selectWithoutLocationPreference
             @Nonnull ResourceProfile resourceProfile);
 
     protected abstract double calculateCandidateScore(
-            int localWeigh, int hostLocalWeigh, double 
taskExecutorUtilization);
+            int localWeigh, int hostLocalWeigh, SlotInfoAndResources 
candidate);

Review Comment:
   How about change this method to
   ```
   protected abstract double calculateCandidateScore(
               int localWeigh, int hostLocalWeigh, Supplier<Double> 
taskExecutorUtilizationSupplier);
   ```
   As it stands, passing `SlotInfoAndResources` seems redundant.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotInfoWithUtilization.java:
##########
@@ -18,26 +18,41 @@
 
 package org.apache.flink.runtime.jobmaster.slotpool;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobmaster.SlotInfo;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 
+import java.util.function.Function;
+
 /**
  * Container for {@link SlotInfo} and the task executors utilization 
(freeSlots /
  * totalOfferedSlots).
  */
 public final class SlotInfoWithUtilization implements SlotInfo {
     private final SlotInfo slotInfoDelegate;
-    private final double taskExecutorUtilization;
+    private final Function<ResourceID, Double> taskExecutorUtilizationLookup;
 
     private SlotInfoWithUtilization(SlotInfo slotInfo, double 
taskExecutorUtilization) {
+        this(slotInfo, resourceId -> taskExecutorUtilization);
+    }
+
+    private SlotInfoWithUtilization(
+            SlotInfo slotInfo, Function<ResourceID, Double> 
taskExecutorUtilizationLookup) {
         this.slotInfoDelegate = slotInfo;
-        this.taskExecutorUtilization = taskExecutorUtilization;
+        this.taskExecutorUtilizationLookup = taskExecutorUtilizationLookup;
     }
 
+    @VisibleForTesting
     double getTaskExecutorUtilization() {
-        return taskExecutorUtilization;
+        return taskExecutorUtilizationLookup.apply(
+                slotInfoDelegate.getTaskManagerLocation().getResourceID());
+    }

Review Comment:
   This method should be removed. The rule of thumb is: If we can think of a 
way to do verification, it is best not to introduce methods that are purely for 
testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to