akpatnam25 commented on code in PR #3347:
URL: https://github.com/apache/celeborn/pull/3347#discussion_r2206527755
##########
master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java:
##########
@@ -246,11 +267,60 @@ private static StorageInfo getStorageInfo(
return storageInfo;
}
+ /**
+ * If interruptionAware = true, select workers based on 2 main criteria: <br>
+ * 1. Workers that have no nextInterruptionNotice are the first priority and
are included in the
+ * 1st pass for slot selection. <br>
+ * 2. Workers that have a later interruption notice are a little less
deprioritized, and are
+ * included in the 2nd pass for slot selection. This is determined by
nextInterruptionNotice above
+ * a certain percentage threshold.<br>
+ * All other workers are considered least priority, and are only included
for slot selection in
+ * the worst case. <br>
+ */
+ static Tuple3<List<WorkerInfo>, List<WorkerInfo>, List<WorkerInfo>>
+ prioritizeWorkersBasedOnInterruptionNotice(
+ List<WorkerInfo> workers,
+ boolean shouldReplicate,
+ boolean shouldRackAware,
+ double percentileThreshold) {
+ Map<Boolean, List<WorkerInfo>> partitioned =
+
workers.stream().collect(Collectors.partitioningBy(WorkerInfo::hasInterruptionNotice));
+ List<WorkerInfo> workersWithInterruptions = partitioned.get(true);
+ List<WorkerInfo> workersWithoutInterruptions = partitioned.get(false);
+ // Timestamps towards the boundary of `percentileThreshold` might be the
same. Given this
+ // is a stable sort, it makes sense to randomize these hosts so that the
same hosts are not
+ // consistently selected.
+ Collections.shuffle(workersWithInterruptions);
+ workersWithInterruptions.sort(
+
Comparator.comparingLong(WorkerInfo::nextInterruptionNotice).reversed());
+ int requiredNodes =
+ (int) Math.floor((percentileThreshold *
workersWithInterruptions.size()) / 100.0);
+
+ List<WorkerInfo> workersWithLateInterruptions =
+ new ArrayList<>(workersWithInterruptions.subList(0, requiredNodes));
+ List<WorkerInfo> workersWithEarlyInterruptions =
+ new ArrayList<>(
+ workersWithInterruptions.subList(requiredNodes,
workersWithInterruptions.size()));
+ if (shouldReplicate && shouldRackAware) {
+ return Tuple3.apply(
+ generateRackAwareWorkers(workersWithoutInterruptions),
+ generateRackAwareWorkers(workersWithLateInterruptions),
Review Comment:
ahh, I see what you mean. Fixed :)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]