This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new a77a8eb8f [CELEBORN-881][BUG] StorageManager clean up thread may 
delete new app directories
a77a8eb8f is described below

commit a77a8eb8fddb331aaaf6001356cd1eaab42ff4df
Author: hongzhaoyang <[email protected]>
AuthorDate: Fri Sep 8 20:41:04 2023 +0800

    [CELEBORN-881][BUG] StorageManager clean up thread may delete new app 
directories
    
    ### What changes were proposed in this pull request?
    
    Worker throw FileNotFoundException while fetch chunk:
    ```
    java.io.FileNotFoundException: 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/871-0-0
 (No such file or directory
    ```
    before commit shuffle files, files are deleted in storage-scheduler thread
    ```
    2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - 
org.apache.celeborn.service.deploy.worker.storage.StorageManager 
-Logging.scala(51) -Create file 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/986-0-0
 success
    2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] - 
org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) 
-Reserved 29 primary location and 0 replica location for 
application_1693206141914_540726_1-1
    2023-09-07 19:38:16,537 [INFO] [storage-scheduler] - 
org.apache.celeborn.service.deploy.worker.storage.StorageManager 
-Logging.scala(51) -Delete expired app dir 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
    2023-09-07 19:38:16,580 [INFO] [storage-scheduler] - 
org.apache.celeborn.service.deploy.worker.storage.StorageManager 
-Logging.scala(51) -Delete expired app dir 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
    2023-09-07 19:38:16,629 [INFO] [storage-scheduler] - 
org.apache.celeborn.service.deploy.worker.storage.StorageManager 
-Logging.scala(51) -Delete expired app dir 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
    2023-09-07 19:38:16,661 [INFO] [storage-scheduler] - 
org.apache.celeborn.service.deploy.worker.storage.StorageManager 
-Logging.scala(51) -Delete expired app dir 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
    2023-09-07 19:38:16,681 [INFO] [storage-scheduler] - 
org.apache.celeborn.service.deploy.worker.storage.StorageManager 
-Logging.scala(51) -Delete expired app dir 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
    2023-09-07 19:38:17,355 [INFO] [dispatcher-event-loop-12] - 
org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Start 
commitFiles for application_1693206141914_540726_1-1
    2023-09-07 19:38:17,362 [INFO] [async-reply] - 
org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) 
-CommitFiles for application_1693206141914_540726_1-1 success with 29 committed 
primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0 
committed replica partitions, 0 empty replica partitions, 0 failed replica 
partitions.
    java.io.FileNotFoundException: 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/976-0-0
 (No such file or directory)
    java.io.FileNotFoundException: 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/482-0-0
 (No such file or directory)
    java.io.FileNotFoundException: 
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/658-0-0
 (No such file or directory)
    ```
    it may have concurrent problem in this method.
    ``` scala
    private def cleanupExpiredAppDirs(): Unit = {
      val appIds = shuffleKeySet().asScala.map(key => 
Utils.splitShuffleKey(key)._1)
      disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo 
=>
        diskInfo.dirs.foreach {
          case workingDir if workingDir.exists() =>
            workingDir.listFiles().foreach { appDir =>
              // Don't delete shuffleKey's data that exist correct shuffle file 
info.
              if (!appIds.contains(appDir.getName)) {
                val threadPool = diskOperators.get(diskInfo.mountPoint)
                deleteDirectory(appDir, threadPool)
                logInfo(s"Delete expired app dir $appDir.")
              }
            }
          // workingDir not exist when initializing worker on new disk
          case _ => // do nothing
        }
      }
    }
    ```
    We should find all app directories first, then get the active shuffle keys.
    
    https://issues.apache.org/jira/browse/CELEBORN-881
    
    ### Why are the changes needed?
    Bugfix.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Passes GA and manual test.
    
    Closes #1889 from zy-jordan/CELEBORN-881.
    
    Lead-authored-by: hongzhaoyang <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Co-authored-by: Keyong Zhou <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala     | 27 +++++++++++-----------
 1 file changed, 14 insertions(+), 13 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index aa9b72a1f..64538f1fc 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -594,20 +594,21 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     TimeUnit.MINUTES)
 
   private def cleanupExpiredAppDirs(): Unit = {
+    val diskInfoAndAppDirs = disksSnapshot()
+      .filter(_.status != DiskStatus.IO_HANG)
+      .map { case diskInfo =>
+        (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles()))
+      }
     val appIds = shuffleKeySet().asScala.map(key => 
Utils.splitShuffleKey(key)._1)
-    disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo 
=>
-      diskInfo.dirs.foreach {
-        case workingDir if workingDir.exists() =>
-          workingDir.listFiles().foreach { appDir =>
-            // Don't delete shuffleKey's data that exist correct shuffle file 
info.
-            if (!appIds.contains(appDir.getName)) {
-              val threadPool = diskOperators.get(diskInfo.mountPoint)
-              deleteDirectory(appDir, threadPool)
-              logInfo(s"Delete expired app dir $appDir.")
-            }
-          }
-        // workingDir not exist when initializing worker on new disk
-        case _ => // do nothing
+
+    diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
+      appDirs.foreach { appDir =>
+        // Don't delete shuffleKey's data that exist correct shuffle file info.
+        if (!appIds.contains(appDir.getName)) {
+          val threadPool = diskOperators.get(diskInfo.mountPoint)
+          deleteDirectory(appDir, threadPool)
+          logInfo(s"Delete expired app dir $appDir.")
+        }
       }
     }
   }

Reply via email to