This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 164c66349 [CELEBORN-1696] StorageManager#cleanFile should remove file 
info
164c66349 is described below

commit 164c6634924e9537a9668df85f690e1ec41be8b5
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 22:50:02 2024 +0800

    [CELEBORN-1696] StorageManager#cleanFile should remove file info
    
    ### What changes were proposed in this pull request?
    
    Remove file ino in  `StorageManager#cleanFile`
    
    ### Why are the changes needed?
    
     StorageManager#cleanFile should remove file info
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    No need.
    
    Closes #2887 from reswqa/fix_clean_file.
    
    Authored-by: Weijie Guo <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../deploy/worker/storage/StorageManager.scala       | 20 +++++++++++++++++++-
 1 file changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 16e65f145..517bcb971 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -525,6 +525,24 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     null
   }
 
+  private def removeFileInfo(shuffleKey: String, fileName: String): FileInfo = 
{
+    val memoryShuffleMap = memoryFileInfos.get(shuffleKey)
+    if (memoryShuffleMap != null) {
+      return memoryShuffleMap.remove(fileName)
+    }
+
+    val diskShuffleMap = diskFileInfos.get(shuffleKey)
+    if (diskShuffleMap != null) {
+      if (workerGracefulShutdown) {
+        val committedFileInfoMap = committedFileInfos.get(shuffleKey)
+        committedFileInfoMap.remove(fileName)
+      }
+      return diskShuffleMap.remove(fileName)
+    }
+
+    null
+  }
+
   def getFetchTimeMetric(file: File): TimeWindow = {
     if (diskInfos != null) {
       val diskInfo = 
diskInfos.get(DeviceInfo.getMountPoint(file.getAbsolutePath, diskInfos))
@@ -554,7 +572,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
   }
 
   def cleanFile(shuffleKey: String, fileName: String): Unit = {
-    cleanFileInternal(shuffleKey, getFileInfo(shuffleKey, fileName))
+    cleanFileInternal(shuffleKey, removeFileInfo(shuffleKey, fileName))
   }
 
   def cleanFileInternal(shuffleKey: String, fileInfo: FileInfo): Boolean = {

Reply via email to