This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 56bcbc026 [CELEBORN-1046] Add an expiration time configuration for app 
directory to clean up
56bcbc026 is described below

commit 56bcbc026bb3a4655a777d8fc84fa80285a772a9
Author: sunjunjie <[email protected]>
AuthorDate: Tue Oct 17 19:23:49 2023 +0800

    [CELEBORN-1046] Add an expiration time configuration for app directory to 
clean up
    
    ### What changes were proposed in this pull request?
    Add a configuration "celeborn.worker.storage.expireDirs.timeout" with a 
default value of 6h in rsswork. This configuration is used to set the 
expiration time for app local directories.
    
    https://issues.apache.org/jira/browse/CELEBORN-1046
    ### Why are the changes needed?
    When Celeborn periodically deletes the directories of apps, it determines 
whether the app needs to be deleted based on the shuffleKeySet in memory. 
However, this method may not accurately indicate the completion of the app and 
could potentially lead to the unintentional deletion of shuffle data.
    
    ### Does this PR introduce _any_ user-facing change?
    
    ### How was this patch tested?
    
    Closes #1998 from wilsonjie/CELEBORN-1046.
    
    Authored-by: sunjunjie <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 03498ce46b0900f7565c717565a6637e2d3341bb)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../main/scala/org/apache/celeborn/common/CelebornConf.scala   |  9 +++++++++
 docs/configuration/worker.md                                   |  1 +
 .../service/deploy/worker/storage/StorageManager.scala         | 10 ++++++----
 3 files changed, 16 insertions(+), 4 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 60a5f7f58..249cce8f7 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -907,6 +907,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable 
with Logging with Se
 
   def workerStorageBaseDirPrefix: String = get(WORKER_STORAGE_BASE_DIR_PREFIX)
   def workerStorageBaseDirNumber: Int = get(WORKER_STORAGE_BASE_DIR_COUNT)
+  def workerStorageExpireDirTimeout: Long = 
get(WORKER_STORAGE_EXPIRE_DIR_TIMEOUT)
   def creditStreamThreadsPerMountpoint: Int = 
get(WORKER_BUFFERSTREAM_THREADS_PER_MOUNTPOINT)
   def workerDirectMemoryRatioForReadBuffer: Double = 
get(WORKER_DIRECT_MEMORY_RATIO_FOR_READ_BUFFER)
   def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
@@ -2050,6 +2051,14 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(16)
 
+  val WORKER_STORAGE_EXPIRE_DIR_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.worker.storage.expireDirs.timeout")
+      .categories("worker")
+      .version("0.3.2")
+      .doc(s"The timeout for a expire dirs to be deleted on disk.")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("1h")
+
   val HDFS_DIR: OptionalConfigEntry[String] =
     buildConf("celeborn.storage.hdfs.dir")
       .categories("worker", "master", "client")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 3cc87e5c4..1afaba6e3 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -99,6 +99,7 @@ license: |
 | celeborn.worker.storage.checkDirsEmpty.timeout | 1000ms | The wait time per 
retry for a worker to check if the working directory is cleaned up before 
registering with the master. | 0.3.0 | 
 | celeborn.worker.storage.dirs | &lt;undefined&gt; | Directory list to store 
shuffle data. It's recommended to configure one directory on each disk. Storage 
size limit can be set for each directory. For the sake of performance, there 
should be no more than 2 flush threads on the same disk partition if you are 
using HDD, and should be 8 or more flush threads on the same disk partition if 
you are using SSD. For example: 
`dir1[:capacity=][:disktype=][:flushthread=],dir2[:capacity=][:disktyp [...]
 | celeborn.worker.storage.disk.reserve.size | 5G | Celeborn worker reserved 
space for each disk. | 0.3.0 | 
+| celeborn.worker.storage.expireDirs.timeout | 1h | The timeout for a expire 
dirs to be deleted on disk. | 0.3.2 | 
 | celeborn.worker.storage.workingDir | celeborn-worker/shuffle_data | Worker's 
working dir path name. | 0.3.0 | 
 | celeborn.worker.writer.close.timeout | 120s | Timeout for a file writer to 
close | 0.2.0 | 
 | celeborn.worker.writer.create.maxAttempts | 3 | Retry count for a file 
writer to create if its creation was failed. | 0.2.0 | 
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 1147d1b34..c818a238a 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -55,6 +55,8 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
 
   val hasHDFSStorage = conf.hasHDFSStorage
 
+  val storageExpireDirTimeout = conf.workerStorageExpireDirTimeout
+
   // (deviceName -> deviceInfo) and (mount point -> diskInfo)
   val (deviceInfos, diskInfos) = {
     val workingDirInfos =
@@ -227,7 +229,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       saveCommittedFileInfoInterval,
       TimeUnit.MILLISECONDS)
   }
-  cleanupExpiredAppDirs()
+  cleanupExpiredAppDirs(System.currentTimeMillis() - storageExpireDirTimeout)
   if (!checkIfWorkingDirCleaned) {
     logWarning(
       "Worker still has residual files in the working directory before 
registering with Master, " +
@@ -588,7 +590,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       override def run(): Unit = {
         try {
           // Clean up dirs which it's application is expired.
-          cleanupExpiredAppDirs()
+          cleanupExpiredAppDirs(System.currentTimeMillis() - 
storageExpireDirTimeout)
         } catch {
           case exception: Exception =>
             logWarning(s"Cleanup expired shuffle data exception: 
${exception.getMessage}")
@@ -599,7 +601,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     30,
     TimeUnit.MINUTES)
 
-  private def cleanupExpiredAppDirs(): Unit = {
+  private def cleanupExpiredAppDirs(expireDuration: Long): Unit = {
     val diskInfoAndAppDirs = disksSnapshot()
       .filter(_.status != DiskStatus.IO_HANG)
       .map { case diskInfo =>
@@ -610,7 +612,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     diskInfoAndAppDirs.foreach { case (diskInfo, appDirs) =>
       appDirs.foreach { appDir =>
         // Don't delete shuffleKey's data that exist correct shuffle file info.
-        if (!appIds.contains(appDir.getName)) {
+        if (!appIds.contains(appDir.getName) && appDir.lastModified() < 
expireDuration) {
           val threadPool = diskOperators.get(diskInfo.mountPoint)
           deleteDirectory(appDir, threadPool)
           logInfo(s"Delete expired app dir $appDir.")

Reply via email to