saurabhd336 commented on code in PR #3593:
URL: https://github.com/apache/celeborn/pull/3593#discussion_r2757501772


##########
worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java:
##########
@@ -313,6 +322,93 @@ public FileInfo getSortedFileInfo(
     }
   }
 
+  // Async variant: returns a future that completes with sorted FileInfo 
without blocking Netty threads.
+  public CompletableFuture<FileInfo> getSortedFileInfoAsync(
+      String shuffleKey,
+      String fileName,
+      FileInfo fileInfo,
+      int startMapIndex,
+      int endMapIndex,
+      Executor executor) {
+    if (fileInfo instanceof MemoryFileInfo) {
+      return CompletableFuture.completedFuture(getSortedMemoryFileInfo(
+              (MemoryFileInfo) fileInfo,
+              startMapIndex,
+              endMapIndex));
+    } else {
+      DiskFileInfo diskFileInfo = ((DiskFileInfo) fileInfo);
+      String fileId = shuffleKey + "-" + fileName;
+      Set<String> sorted =
+          sortedShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+      Set<String> sorting =
+          sortingShuffleFiles.computeIfAbsent(shuffleKey, v -> 
ConcurrentHashMap.newKeySet());
+
+      String sortedFilePath = 
Utils.getSortedFilePath(diskFileInfo.getFilePath());
+      String indexFilePath = 
Utils.getIndexFilePath(diskFileInfo.getFilePath());
+
+      synchronized (sorting) {
+        if (!sorted.contains(fileId) && !sorting.contains(fileId)) {
+          try {
+            FileSorter fileSorter = new FileSorter(diskFileInfo, fileId, 
shuffleKey);
+            sorting.add(fileId);
+            logger.debug(
+                "Adding sorter to sort queue (async) shuffle key {}, file name 
{}",
+                shuffleKey,
+                fileName);
+            shuffleSortTaskDeque.put(fileSorter);
+          } catch (InterruptedException e) {
+            CompletableFuture<FileInfo> failed = new CompletableFuture<>();
+            failed.completeExceptionally(
+                new IOException(
+                    "Sort scheduler thread is interrupted means worker is 
shutting down.", e));
+            return failed;
+          } catch (IOException e) {
+            CompletableFuture<FileInfo> failed = new CompletableFuture<>();
+            failed.completeExceptionally(new IOException("File sorter access 
DFS failed.", e));
+            return failed;
+          }
+        }
+      }
+
+      if (sorted.contains(fileId)) {
+        try {
+          return CompletableFuture.completedFuture(
+              resolve(

Review Comment:
   This should already be running on the caller thread i.e. netty thread



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to