akpatnam25 commented on code in PR #3347: URL: https://github.com/apache/celeborn/pull/3347#discussion_r2196666568
########## master/src/main/java/org/apache/celeborn/service/deploy/master/SlotsAllocator.java: ########## @@ -246,11 +267,60 @@ private static StorageInfo getStorageInfo( return storageInfo; } + /** + * If interruptionAware = true, select workers based on 2 main criteria: <br> + * 1. Workers that have no nextInterruptionNotice are the first priority and are included in the + * 1st pass for slot selection. <br> + * 2. Workers that have a later interruption notice are a little less deprioritized, and are + * included in the 2nd pass for slot selection. This is determined by nextInterruptionNotice above + * a certain percentage threshold.<br> + * All other workers are considered least priority, and are only included for slot selection in + * the worst case. <br> + */ + static Tuple3<List<WorkerInfo>, List<WorkerInfo>, List<WorkerInfo>> + prioritizeWorkersBasedOnInterruptionNotice( + List<WorkerInfo> workers, + boolean shouldReplicate, + boolean shouldRackAware, + double percentileThreshold) { + Map<Boolean, List<WorkerInfo>> partitioned = + workers.stream().collect(Collectors.partitioningBy(WorkerInfo::hasInterruptionNotice)); + List<WorkerInfo> workersWithInterruptions = partitioned.get(true); + List<WorkerInfo> workersWithoutInterruptions = partitioned.get(false); + // Timestamps towards the boundary of `percentileThreshold` might be the same. Given this + // is a stable sort, it makes sense to randomize these hosts so that the same hosts are not + // consistently selected. + Collections.shuffle(workersWithInterruptions); + workersWithInterruptions.sort( + Comparator.comparingLong(WorkerInfo::nextInterruptionNotice).reversed()); + int requiredNodes = + (int) Math.floor((percentileThreshold * workersWithInterruptions.size()) / 100.0); + + List<WorkerInfo> workersWithLateInterruptions = + new ArrayList<>(workersWithInterruptions.subList(0, requiredNodes)); + List<WorkerInfo> workersWithEarlyInterruptions = + new ArrayList<>( + workersWithInterruptions.subList(requiredNodes, workersWithInterruptions.size())); + if (shouldReplicate && shouldRackAware) { + return Tuple3.apply( + generateRackAwareWorkers(workersWithoutInterruptions), + generateRackAwareWorkers(workersWithLateInterruptions), Review Comment: @mridulm It has both `workersWithNoInterruptions` and also `workersWithLateInterruptions`. So I think we still need to call `generateRackAwareWorkers` on the new combined list, right? ``` List<WorkerInfo> primaryWorkerCandidates = new ArrayList<>(workersWithoutInterruptions); primaryWorkerCandidates.addAll(workersWithLateInterruptions); if (shouldReplicate && shouldRackAware) { primaryWorkerCandidates = generateRackAwareWorkers(primaryWorkerCandidates); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@celeborn.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org