This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new aa9dfd056 [CELEBORN-1005][BUG] Clean Expired App Dirs will delete the
running a…
aa9dfd056 is described below
commit aa9dfd05667275b8c2a3089ff3dcf15389348f8b
Author: sunjunjie <[email protected]>
AuthorDate: Mon Sep 25 23:20:48 2023 +0800
[CELEBORN-1005][BUG] Clean Expired App Dirs will delete the running a…
### What changes were proposed in this pull request?
When working on reading shuffle data, the file was accidentally deleted
`2023-09-22 16:32:36,810 [storage-scheduler] INFO
org.apache.celeborn.service.deploy.worker.storage.StorageManager[51]: Delete
expired app dir
/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1.
2023-09-22 16:32:36,810 [Disk-cleaner-/data8-6] DEBUG
org.apache.celeborn.service.deploy.worker.storage.StorageManager[47]: Deleted
expired shuffle file
/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0.
2023-09-22 16:32:53,304 [fetch-server-11-31] DEBUG
org.apache.celeborn.service.deploy.worker.FetchHandler[47]: Received chunk
fetch request application_1689848866482_12296544_1-32 924-0-0 0 2147483647 get
file info
FileInfo{file=/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0,
chunkOffsets=0,558, userIdentifier=`default`.`default`, partitionType=REDUCE}
java.io.FileNotFoundException:
/data8/rssdata/celeborn-worker/shuffle_data/application_1689848866482_12296544_1/32/924-0-0
(No such file or directory)`
Because when cleaning up the directories of expired apps, the file
directory is created first and then added to the fileInfos collection. As a
result, when getting the shuffleKeySet, the running apps do not yet exist,
causing the files to be mistakenly deleted.
https://issues.apache.org/jira/browse/CELEBORN-1005
### Why are the changes needed?
bugfix
### Does this PR introduce _any_ user-facing change?
### How was this patch tested?
Closes #1937 from wilsonjie/CELEBORN-1005.
Lead-authored-by: sunjunjie <[email protected]>
Co-authored-by: junjie.sun <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 20 ++++++++++----------
1 file changed, 10 insertions(+), 10 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 20252e911..b1f850f8d 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -358,13 +358,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (dirs.isEmpty) {
val shuffleDir =
new Path(new Path(hdfsDir, conf.workerWorkingDir),
s"$appId/$shuffleId")
- FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
val fileInfo =
new FileInfo(
new Path(shuffleDir, fileName).toString,
userIdentifier,
partitionType,
partitionSplitEnabled)
+ fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
+ FileSystem.mkdirs(StorageManager.hadoopFs, shuffleDir, hdfsPermission)
val hdfsWriter = partitionType match {
case PartitionType.MAP => new MapPartitionFileWriter(
fileInfo,
@@ -390,7 +391,6 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
hdfsWriter.setStorageManager(this)
hdfsWriter.setShuffleKey(shuffleKey)
}
- fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
return hdfsWriter
} else {
@@ -399,6 +399,14 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
val shuffleDir = new File(dir, s"$appId/$shuffleId")
val file = new File(shuffleDir, fileName)
try {
+ val fileInfo =
+ new FileInfo(
+ file.getAbsolutePath,
+ userIdentifier,
+ partitionType,
+ partitionSplitEnabled)
+ fileInfo.setMountPoint(mountPoint)
+ fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
shuffleDir.mkdirs()
if (file.exists()) {
throw new FileAlreadyExistsException(
@@ -410,13 +418,6 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
s"Create shuffle data file ${file.getAbsolutePath} failed!")
}
}
- val fileInfo =
- new FileInfo(
- file.getAbsolutePath,
- userIdentifier,
- partitionType,
- partitionSplitEnabled)
- fileInfo.setMountPoint(mountPoint)
val fileWriter = partitionType match {
case PartitionType.MAP => new MapPartitionFileWriter(
fileInfo,
@@ -445,7 +446,6 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
deviceMonitor.registerFileWriter(fileWriter)
val map = workingDirWriters.computeIfAbsent(dir,
workingDirWriterListFunc)
map.put(fileInfo.getFilePath, fileWriter)
- fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
location.getStorageInfo.setMountPoint(mountPoint)
logDebug(s"location $location set disk hint to
${location.getStorageInfo} ")
return fileWriter