This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new daec44704 [CELEBORN-1005][BUG] Clean Expired App Dirs will delete the 
running a…
daec44704 is described below

commit daec44704ef5fa4b04174001aec91395e60a7cbd
Author: sunjunjie <[email protected]>
AuthorDate: Mon Sep 25 23:20:48 2023 +0800

    [CELEBORN-1005][BUG] Clean Expired App Dirs will delete the running a…
    
    ### What changes were proposed in this pull request?
    When working on reading shuffle data, the file was accidentally deleted
    
    `2023-09-22 16:32:36,810 [storage-scheduler] INFO  
org.apache.celeborn.service.deploy.worker.storage.StorageManager[51]: Delete 
expired app dir 
/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1.
    2023-09-22 16:32:36,810 [Disk-cleaner-/data8-6] DEBUG 
org.apache.celeborn.service.deploy.worker.storage.StorageManager[47]: Deleted 
expired shuffle file 
/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0.
    2023-09-22 16:32:53,304 [fetch-server-11-31] DEBUG 
org.apache.celeborn.service.deploy.worker.FetchHandler[47]: Received chunk 
fetch request application_1689848866482_12296544_1-32 924-0-0 0 2147483647 get 
file info 
FileInfo{file=/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0,
 chunkOffsets=0,558, userIdentifier=`default`.`default`, partitionType=REDUCE}
    java.io.FileNotFoundException: 
/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0
 (No such file or directory)`
    
    Because when cleaning up the directories of expired apps, the file 
directory is created first and then added to the fileInfos collection. As a 
result, when getting the shuffleKeySet, the running apps do not yet exist, 
causing the files to be mistakenly deleted.
    
    https://issues.apache.org/jira/browse/CELEBORN-1005
    ### Why are the changes needed?
    bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1937 from wilsonjie/CELEBORN-1005.
    
    Lead-authored-by: sunjunjie <[email protected]>
    Co-authored-by: junjie.sun <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit aa9dfd05667275b8c2a3089ff3dcf15389348f8b)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala       | 20 ++++++++++----------
 1 file changed, 10 insertions(+), 10 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index f9575c780..1147d1b34 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -355,13 +355,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       if (dirs.isEmpty) {
         val shuffleDir =
           new Path(new Path(hdfsDir, conf.workerWorkingDir), 
s"$appId/$shuffleId")
-        FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
         val fileInfo =
           new FileInfo(
             new Path(shuffleDir, fileName).toString,
             userIdentifier,
             partitionType,
             partitionSplitEnabled)
+        fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
+        FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
         val hdfsWriter = partitionType match {
           case PartitionType.MAP => new MapPartitionFileWriter(
               fileInfo,
@@ -387,7 +388,6 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           hdfsWriter.setStorageManager(this)
           hdfsWriter.setShuffleKey(shuffleKey)
         }
-        fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
         hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
         return hdfsWriter
       } else {
@@ -396,6 +396,14 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         val shuffleDir = new File(dir, s"$appId/$shuffleId")
         val file = new File(shuffleDir, fileName)
         try {
+          val fileInfo =
+            new FileInfo(
+              file.getAbsolutePath,
+              userIdentifier,
+              partitionType,
+              partitionSplitEnabled)
+          fileInfo.setMountPoint(mountPoint)
+          fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
           shuffleDir.mkdirs()
           if (file.exists()) {
             throw new FileAlreadyExistsException(
@@ -407,13 +415,6 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
                 s"Create shuffle data file ${file.getAbsolutePath} failed!")
             }
           }
-          val fileInfo =
-            new FileInfo(
-              file.getAbsolutePath,
-              userIdentifier,
-              partitionType,
-              partitionSplitEnabled)
-          fileInfo.setMountPoint(mountPoint)
           val fileWriter = partitionType match {
             case PartitionType.MAP => new MapPartitionFileWriter(
                 fileInfo,
@@ -442,7 +443,6 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
           deviceMonitor.registerFileWriter(fileWriter)
           val map = workingDirWriters.computeIfAbsent(dir, 
workingDirWriterListFunc)
           map.put(fileInfo.getFilePath, fileWriter)
-          fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
           location.getStorageInfo.setMountPoint(mountPoint)
           logDebug(s"location $location set disk hint to 
${location.getStorageInfo} ")
           return fileWriter

Reply via email to