This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 8f5716f18 [CELEBORN-1696] StorageManager#cleanFile should remove file
info
8f5716f18 is described below
commit 8f5716f180f88493bd3f94f646ec7a08d99db226
Author: Weijie Guo <[email protected]>
AuthorDate: Thu Nov 7 22:50:02 2024 +0800
[CELEBORN-1696] StorageManager#cleanFile should remove file info
### What changes were proposed in this pull request?
Remove file ino in `StorageManager#cleanFile`
### Why are the changes needed?
StorageManager#cleanFile should remove file info
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
No need.
Closes #2887 from reswqa/fix_clean_file.
Authored-by: Weijie Guo <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 164c6634924e9537a9668df85f690e1ec41be8b5)
Signed-off-by: Shuang <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 20 +++++++++++++++++++-
1 file changed, 19 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 1becfed76..a582fe4c3 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -475,6 +475,24 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
null
}
+ private def removeFileInfo(shuffleKey: String, fileName: String): FileInfo =
{
+ val memoryShuffleMap = memoryFileInfos.get(shuffleKey)
+ if (memoryShuffleMap != null) {
+ return memoryShuffleMap.remove(fileName)
+ }
+
+ val diskShuffleMap = diskFileInfos.get(shuffleKey)
+ if (diskShuffleMap != null) {
+ if (workerGracefulShutdown) {
+ val committedFileInfoMap = committedFileInfos.get(shuffleKey)
+ committedFileInfoMap.remove(fileName)
+ }
+ return diskShuffleMap.remove(fileName)
+ }
+
+ null
+ }
+
def getFetchTimeMetric(file: File): TimeWindow = {
if (diskInfos != null) {
val diskInfo =
diskInfos.get(DeviceInfo.getMountPoint(file.getAbsolutePath, diskInfos))
@@ -503,7 +521,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
def cleanFile(shuffleKey: String, fileName: String): Unit = {
- cleanFileInternal(shuffleKey, getFileInfo(shuffleKey, fileName))
+ cleanFileInternal(shuffleKey, removeFileInfo(shuffleKey, fileName))
}
def cleanFileInternal(shuffleKey: String, fileInfo: FileInfo): Boolean = {