This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 1f386d2e3 [CELEBORN-881][BUG] StorageManager clean up thread may
delete new app directories
1f386d2e3 is described below
commit 1f386d2e33e8b2ae5932353074efda28021d11bf
Author: hongzhaoyang <[email protected]>
AuthorDate: Fri Sep 8 20:41:04 2023 +0800
[CELEBORN-881][BUG] StorageManager clean up thread may delete new app
directories
### What changes were proposed in this pull request?
Worker throw FileNotFoundException while fetch chunk:
```
java.io.FileNotFoundException:
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/871-0-0
(No such file or directory
```
before commit shuffle files, files are deleted in storage-scheduler thread
```
2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] -
org.apache.celeborn.service.deploy.worker.storage.StorageManager
-Logging.scala(51) -Create file
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/986-0-0
success
2023-09-07 19:38:16,506 [INFO] [dispatcher-event-loop-44] -
org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51)
-Reserved 29 primary location and 0 replica location for
application_1693206141914_540726_1-1
2023-09-07 19:38:16,537 [INFO] [storage-scheduler] -
org.apache.celeborn.service.deploy.worker.storage.StorageManager
-Logging.scala(51) -Delete expired app dir
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,580 [INFO] [storage-scheduler] -
org.apache.celeborn.service.deploy.worker.storage.StorageManager
-Logging.scala(51) -Delete expired app dir
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,629 [INFO] [storage-scheduler] -
org.apache.celeborn.service.deploy.worker.storage.StorageManager
-Logging.scala(51) -Delete expired app dir
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,661 [INFO] [storage-scheduler] -
org.apache.celeborn.service.deploy.worker.storage.StorageManager
-Logging.scala(51) -Delete expired app dir
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:16,681 [INFO] [storage-scheduler] -
org.apache.celeborn.service.deploy.worker.storage.StorageManager
-Logging.scala(51) -Delete expired app dir
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1.
2023-09-07 19:38:17,355 [INFO] [dispatcher-event-loop-12] -
org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51) -Start
commitFiles for application_1693206141914_540726_1-1
2023-09-07 19:38:17,362 [INFO] [async-reply] -
org.apache.celeborn.service.deploy.worker.Controller -Logging.scala(51)
-CommitFiles for application_1693206141914_540726_1-1 success with 29 committed
primary partitions, 0 empty primary partitions, 0 failed primary partitions, 0
committed replica partitions, 0 empty replica partitions, 0 failed replica
partitions.
java.io.FileNotFoundException:
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/976-0-0
(No such file or directory)
java.io.FileNotFoundException:
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/482-0-0
(No such file or directory)
java.io.FileNotFoundException:
/xxx/celeborn-worker/shuffle_data/subdir_0/application_1693206141914_540726_1/1/658-0-0
(No such file or directory)
```
it may have concurrent problem in this method.
``` scala
private def cleanupExpiredAppDirs(): Unit = {
val appIds = shuffleKeySet().asScala.map(key =>
Utils.splitShuffleKey(key)._1)
disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo
=>
diskInfo.dirs.foreach {
case workingDir if workingDir.exists() =>
workingDir.listFiles().foreach { appDir =>
// Don't delete shuffleKey's data that exist correct shuffle file
info.
if (!appIds.contains(appDir.getName)) {
val threadPool = diskOperators.get(diskInfo.mountPoint)
deleteDirectory(appDir, threadPool)
logInfo(s"Delete expired app dir $appDir.")
}
}
// workingDir not exist when initializing worker on new disk
case _ => // do nothing
}
}
}
```
We should find all app directories first, then get the active shuffle keys.
https://issues.apache.org/jira/browse/CELEBORN-881
### Why are the changes needed?
Bugfix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Passes GA and manual test.
Closes #1889 from zy-jordan/CELEBORN-881.
Lead-authored-by: hongzhaoyang <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Co-authored-by: Keyong Zhou <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit a77a8eb8fddb331aaaf6001356cd1eaab42ff4df)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../deploy/worker/storage/StorageManager.scala | 27 +++++++++++-----------
1 file changed, 14 insertions(+), 13 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index aa9b72a1f..64538f1fc 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -594,20 +594,21 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
TimeUnit.MINUTES)
private def cleanupExpiredAppDirs(): Unit = {
+ val diskInfoAndAppDirs = disksSnapshot()
+ .filter(_.status != DiskStatus.IO_HANG)
+ .map { case diskInfo =>
+ (diskInfo, diskInfo.dirs.filter(_.exists).flatMap(_.listFiles()))
+ }
val appIds = shuffleKeySet().asScala.map(key =>
Utils.splitShuffleKey(key)._1)
- disksSnapshot().filter(_.status != DiskStatus.IO_HANG).foreach { diskInfo
=>
- diskInfo.dirs.foreach {
- case workingDir if workingDir.exists() =>
- workingDir.listFiles().foreach { appDir =>
- // Don't delete shuffleKey's data that exist correct shuffle file
info.
- if (!appIds.contains(appDir.getName)) {
- val threadPool = diskOperators.get(diskInfo.mountPoint)
- deleteDirectory(appDir, threadPool)
- logInfo(s"Delete expired app dir $appDir.")
- }
- }
- // workingDir not exist when initializing worker on new disk
- case _ => // do nothing
+
+ diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
+ appDirs.foreach { appDir =>
+ // Don't delete shuffleKey's data that exist correct shuffle file info.
+ if (!appIds.contains(appDir.getName)) {
+ val threadPool = diskOperators.get(diskInfo.mountPoint)
+ deleteDirectory(appDir, threadPool)
+ logInfo(s"Delete expired app dir $appDir.")
+ }
}
}
}