akpatnam25 commented on code in PR #3347:
URL: https://github.com/apache/celeborn/pull/3347#discussion_r2196666568


##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -246,11 +267,60 @@ private static StorageInfo getStorageInfo(
     return storageInfo;
   }
 
+  /**
+   * If interruptionAware = true, select workers based on 2 main criteria: <br>
+   * 1. Workers that have no nextInterruptionNotice are the first priority and 
are included in the
+   * 1st pass for slot selection. <br>
+   * 2. Workers that have a later interruption notice are a little less 
deprioritized, and are
+   * included in the 2nd pass for slot selection. This is determined by 
nextInterruptionNotice above
+   * a certain percentage threshold.<br>
+   * All other workers are considered least priority, and are only included 
for slot selection in
+   * the worst case. <br>
+   */
+  static Tuple3<List<WorkerInfo>, List<WorkerInfo>, List<WorkerInfo>>
+      prioritizeWorkersBasedOnInterruptionNotice(
+          List<WorkerInfo> workers,
+          boolean shouldReplicate,
+          boolean shouldRackAware,
+          double percentileThreshold) {
+    Map<Boolean, List<WorkerInfo>> partitioned =
+        
workers.stream().collect(Collectors.partitioningBy(WorkerInfo::hasInterruptionNotice));
+    List<WorkerInfo> workersWithInterruptions = partitioned.get(true);
+    List<WorkerInfo> workersWithoutInterruptions = partitioned.get(false);
+    // Timestamps towards the boundary of `percentileThreshold` might be the 
same. Given this
+    // is a stable sort, it makes sense to randomize these hosts so that the 
same hosts are not
+    // consistently selected.
+    Collections.shuffle(workersWithInterruptions);
+    workersWithInterruptions.sort(
+        
Comparator.comparingLong(WorkerInfo::nextInterruptionNotice).reversed());
+    int requiredNodes =
+        (int) Math.floor((percentileThreshold * 
workersWithInterruptions.size()) / 100.0);
+
+    List<WorkerInfo> workersWithLateInterruptions =
+        new ArrayList<>(workersWithInterruptions.subList(0, requiredNodes));
+    List<WorkerInfo> workersWithEarlyInterruptions =
+        new ArrayList<>(
+            workersWithInterruptions.subList(requiredNodes, 
workersWithInterruptions.size()));
+    if (shouldReplicate && shouldRackAware) {
+      return Tuple3.apply(
+          generateRackAwareWorkers(workersWithoutInterruptions),
+          generateRackAwareWorkers(workersWithLateInterruptions),

Review Comment:
   @mridulm  It has both `workersWithNoInterruptions` and also 
`workersWithLateInterruptions`.  So I think we still need to call 
`generateRackAwareWorkers` on the new combined list, right? 
   
   ```
         List<WorkerInfo> primaryWorkerCandidates = new 
ArrayList<>(workersWithoutInterruptions);
         primaryWorkerCandidates.addAll(workersWithLateInterruptions);
         if (shouldReplicate && shouldRackAware) {
           primaryWorkerCandidates = 
generateRackAwareWorkers(primaryWorkerCandidates);
         }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@celeborn.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to