This is an automated email from the ASF dual-hosted git repository.

nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 654875924 [CELEBORN-1414] PartitionFilesSorter resolve DiskFileInfo 
without sorting lock
654875924 is described below

commit 6548759243813b878f2fccccf8cc3fd822019a59
Author: SteNicholas <[email protected]>
AuthorDate: Wed May 15 17:16:30 2024 +0800

    [CELEBORN-1414] PartitionFilesSorter resolve DiskFileInfo without sorting 
lock
    
    ### What changes were proposed in this pull request?
    
    `PartitionFilesSorter` calls `resolve` of `DiskFileInfo` without `sorting` 
lock to reduce the lock scope for performance improvement of 
`getSortedFileInfo`.
    
    ### Why are the changes needed?
    
    `PartitionFilesSorter#resolve` is thread safe. Therefore, 
`PartitionFilesSorter` invokes `resolve` with `sorting` lock at present, which 
does not need to lock the resolving of `DiskFileInfo`. `PartitionFilesSorter` 
could resolve `DiskFileInfo` without `sorting` lock to improve performance of 
`getSortedFileInfo`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    GA.
    
    Closes #2498 from SteNicholas/CELEBORN-1414.
    
    Authored-by: SteNicholas <[email protected]>
    Signed-off-by: SteNicholas <[email protected]>
---
 .../worker/storage/PartitionFilesSorter.java       | 47 ++++++++++------------
 1 file changed, 22 insertions(+), 25 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
index 8eaa279f6..0025e6482 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionFilesSorter.java
@@ -206,18 +206,11 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
 
     String sortedFilePath = Utils.getSortedFilePath(fileInfo.getFilePath());
     String indexFilePath = Utils.getIndexFilePath(fileInfo.getFilePath());
+    boolean fileSorting = true;
     synchronized (sorting) {
       if (sorted.contains(fileId)) {
-        return resolve(
-            shuffleKey,
-            fileId,
-            userIdentifier,
-            sortedFilePath,
-            indexFilePath,
-            startMapIndex,
-            endMapIndex);
-      }
-      if (!sorting.contains(fileId)) {
+        fileSorting = false;
+      } else if (!sorting.contains(fileId)) {
         try {
           FileSorter fileSorter = new FileSorter(fileInfo, fileId, shuffleKey);
           sorting.add(fileId);
@@ -235,25 +228,29 @@ public class PartitionFilesSorter extends 
ShuffleRecoverHelper {
       }
     }
 
-    long sortStartTime = System.currentTimeMillis();
-    while (!sorted.contains(fileId)) {
-      if (sorting.contains(fileId)) {
-        try {
-          Thread.sleep(50);
-          if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
-            logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
+    if (fileSorting) {
+      long sortStartTime = System.currentTimeMillis();
+      while (!sorted.contains(fileId)) {
+        if (sorting.contains(fileId)) {
+          try {
+            Thread.sleep(50);
+            if (System.currentTimeMillis() - sortStartTime > sortTimeout) {
+              logger.error("Sorting file {} timeout after {}ms", fileId, 
sortTimeout);
+              throw new IOException(
+                  "Sort file " + fileInfo.getFilePath() + " timeout after " + 
sortTimeout);
+            }
+          } catch (InterruptedException e) {
+            logger.error(
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
             throw new IOException(
-                "Sort file " + fileInfo.getFilePath() + " timeout after " + 
sortTimeout);
+                "Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
           }
-        } catch (InterruptedException e) {
-          logger.error("Sorter scheduler thread is interrupted means worker is 
shutting down.", e);
+        } else {
+          logger.debug(
+              "Sorting shuffle file for {} {} failed.", shuffleKey, 
fileInfo.getFilePath());
           throw new IOException(
-              "Sorter scheduler thread is interrupted means worker is shutting 
down.", e);
+              "Sorting shuffle file for " + shuffleKey + " " + 
fileInfo.getFilePath() + " failed.");
         }
-      } else {
-        logger.debug("Sorting shuffle file for {} {} failed.", shuffleKey, 
fileInfo.getFilePath());
-        throw new IOException(
-            "Sorting shuffle file for " + shuffleKey + " " + 
fileInfo.getFilePath() + " failed.");
       }
     }
 

Reply via email to