This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new efc9a875e [CELEBORN-863] Persist committed file infos to support 
worker recovery
efc9a875e is described below

commit efc9a875e906b67930c2713bb6a979164984be24
Author: mingji <[email protected]>
AuthorDate: Fri Aug 4 23:58:47 2023 +0800

    [CELEBORN-863] Persist committed file infos to support worker recovery
    
    ### What changes were proposed in this pull request?
    Support worker recovery if the worker has crashed when workers has enabled 
graceful shutdown..
    
    1. Persist committed file info to LevelDB.
    2. Load levelDB when worker started.
    3. Clean expired file infos in LevelDB.
    
    ### Why are the changes needed?
    Ditto.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    GA and cluster. After testing on a cluster I found that 8k file infos will 
consume about 2MB of disk space, disk space can be reclaimed if shuffle is 
expired shortly.
    
    Closes #1779 from FMX/CELEBORN-863.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  | 21 ++++++
 docs/configuration/worker.md                       |  2 +
 .../service/deploy/worker/storage/FileWriter.java  | 16 +++++
 .../deploy/worker/storage/StorageManager.scala     | 80 +++++++++++++++++++---
 4 files changed, 109 insertions(+), 10 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index d7bddc40a..381c9da27 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -920,6 +920,10 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
     get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
   def workerGracefulShutdownFlusherShutdownTimeoutMs: Long = 
get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
+  def workerGracefulShutdownSaveCommittedFileInfoInterval: Long =
+    get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
+  def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
+    get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)
 
   // //////////////////////////////////////////////////////
   //                      Flusher                        //
@@ -2246,6 +2250,23 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("3s")
 
+  val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL: 
ConfigEntry[Long] =
+    
buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval")
+      .categories("worker")
+      .doc("Interval for a Celeborn worker to flush committed file infos into 
Level DB.")
+      .version("0.3.1")
+      .timeConf(TimeUnit.MILLISECONDS)
+      .createWithDefaultString("5s")
+
+  val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC: 
ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync")
+      .categories("worker")
+      .doc(
+        "Whether to call sync method to save committed file infos into Level 
DB to handle OS crash.")
+      .version("0.3.1")
+      .booleanConf
+      .createWithDefault(false)
+
   val WORKER_DISKTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
     buildConf("celeborn.worker.flusher.diskTime.slidingWindow.size")
       
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a76835b28..d57591485 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -56,6 +56,8 @@ license: |
 | celeborn.worker.graceful.shutdown.enabled | false | When true, during worker 
shutdown, the worker will wait for all released slots to be committed or 
destroyed. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s | 
The wait time of waiting for sorting partition files during worker graceful 
shutdown. | 0.2.0 | 
 | celeborn.worker.graceful.shutdown.recoverPath | &lt;tmp&gt;/recover | The 
path to store levelDB. | 0.2.0 | 
+| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s | 
Interval for a Celeborn worker to flush committed file infos into Level DB. | 
0.3.1 | 
+| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false | 
Whether to call sync method to save committed file infos into Level DB to 
handle OS crash. | 0.3.1 | 
 | celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful 
shutdown timeout time. | 0.2.0 | 
 | celeborn.worker.monitor.disk.check.interval | 60s | Intervals between device 
monitor to check disk. | 0.3.0 | 
 | celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker 
check device status. | 0.3.0 | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 06eeb0695..01f7d9705 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -81,6 +81,10 @@ public abstract class FileWriter implements DeviceObserver {
   protected boolean deleted = false;
   private RoaringBitmap mapIdBitMap = null;
   protected final FlushNotifier notifier = new FlushNotifier();
+  // It's only needed when graceful shutdown is enabled
+  private String shuffleKey;
+  private StorageManager storageManager;
+  private boolean workerGracefulShutdown;
 
   public FileWriter(
       FileInfo fileInfo,
@@ -97,6 +101,7 @@ public abstract class FileWriter implements DeviceObserver {
     this.flusher = flusher;
     this.flushWorkerIndex = flusher.getWorkerIndex();
     this.writerCloseTimeoutMs = conf.workerWriterCloseTimeoutMs();
+    this.workerGracefulShutdown = conf.workerGracefulShutdown();
     this.splitThreshold = splitThreshold;
     this.deviceMonitor = deviceMonitor;
     this.splitMode = splitMode;
@@ -288,6 +293,9 @@ public abstract class FileWriter implements DeviceObserver {
         deviceMonitor.unregisterFileWriter(this);
       }
     }
+    if (workerGracefulShutdown) {
+      storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(), 
fileInfo);
+    }
     return fileInfo.getFileLength();
   }
 
@@ -440,4 +448,12 @@ public abstract class FileWriter implements DeviceObserver 
{
   public PartitionType getPartitionType() {
     return partitionType;
   }
+
+  public void setShuffleKey(String shuffleKey) {
+    this.shuffleKey = shuffleKey;
+  }
+
+  public void setStorageManager(StorageManager storageManager) {
+    this.storageManager = storageManager;
+  }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e180a785f..a1bca599f 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
 import java.nio.charset.StandardCharsets
 import java.nio.file.{FileAlreadyExistsException, Files, Paths}
 import java.util
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService, 
ThreadPoolExecutor, TimeUnit}
 import java.util.concurrent.atomic.AtomicInteger
 import java.util.function.{BiConsumer, IntUnaryOperator}
 
@@ -31,7 +31,7 @@ import scala.concurrent.duration._
 import io.netty.buffer.PooledByteBufAllocator
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.fs.permission.FsPermission
-import org.iq80.leveldb.DB
+import org.iq80.leveldb.{DB, WriteOptions}
 
 import org.apache.celeborn.common.CelebornConf
 import org.apache.celeborn.common.exception.CelebornException
@@ -179,9 +179,16 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, 
FileInfo]]()
   private val RECOVERY_FILE_NAME = "recovery.ldb"
   private var db: DB = null
+  private var saveCommittedFileInfosExecutor: ScheduledExecutorService = _
+  private val saveCommittedFileInfoBySyncMethod =
+    conf.workerGracefulShutdownSaveCommittedFileInfoSync
+  private val saveCommittedFileInfoInterval =
+    conf.workerGracefulShutdownSaveCommittedFileInfoInterval
+  private var committedFileInfos: ConcurrentHashMap[String, 
ConcurrentHashMap[String, FileInfo]] = _
   // ShuffleClient can fetch data from a restarted worker only
   // when the worker's fetching port is stable.
-  if (conf.workerGracefulShutdown) {
+  val workerGracefulShutdown = conf.workerGracefulShutdown
+  if (workerGracefulShutdown) {
     try {
       val recoverFile = new File(conf.workerGracefulShutdownRecoverPath, 
RECOVERY_FILE_NAME)
       this.db = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION)
@@ -191,6 +198,28 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         logError("Init level DB failed:", e)
         this.db = null
     }
+    committedFileInfos =
+      JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String, 
FileInfo]]()
+    saveCommittedFileInfosExecutor =
+      ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+        "StorageManager-save-committed-fileinfo-thread")
+    saveCommittedFileInfosExecutor.scheduleAtFixedRate(
+      new Runnable {
+        override def run(): Unit = {
+          if (!committedFileInfos.isEmpty) {
+            logInfo(s"Save committed fileinfo with 
${committedFileInfos.size()} shuffle keys")
+            committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
+              db.put(
+                dbShuffleKey(shuffleKey),
+                PbSerDeUtils.toPbFileInfoMap(files),
+                new WriteOptions().sync(saveCommittedFileInfoBySyncMethod))
+            }
+          }
+        }
+      },
+      saveCommittedFileInfoInterval,
+      saveCommittedFileInfoInterval,
+      TimeUnit.MILLISECONDS)
   }
   cleanupExpiredAppDirs()
   if (!checkIfWorkingDirCleaned) {
@@ -228,10 +257,19 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     }
   }
 
-  def updateFileInfosInDB(): Unit = {
-    fileInfos.asScala.foreach { case (shuffleKey, files) =>
+  def saveAllCommittedFileInfosToDB(): Unit = {
+    val runnables = saveCommittedFileInfosExecutor.shutdownNow()
+    // save committed fileinfo to DB should be done within the time of 
saveCommittedFileInfoInterval
+    runnables.asScala.foreach(_.wait(saveCommittedFileInfoInterval))
+    // graceful shutdown might be timed out, persist all committed fileinfos 
to levelDB
+    // final flush write through
+    committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
       try {
-        db.put(dbShuffleKey(shuffleKey), PbSerDeUtils.toPbFileInfoMap(files))
+        db.put(
+          dbShuffleKey(shuffleKey),
+          PbSerDeUtils.toPbFileInfoMap(files),
+          // K8s container might gone
+          new WriteOptions().sync(true))
         logDebug(s"Update FileInfos into DB: ${shuffleKey} -> ${files}")
       } catch {
         case exception: Exception =>
@@ -313,7 +351,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
               rangeReadFilter)
           case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
         }
-
+        if (workerGracefulShutdown) {
+          hdfsWriter.setStorageManager(this)
+          hdfsWriter.setShuffleKey(shuffleKey)
+        }
         fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
         hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
         return hdfsWriter
@@ -357,6 +398,10 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
                 rangeReadFilter)
             case _ => throw new UnsupportedOperationException(s"Not support 
$partitionType yet")
           }
+          if (workerGracefulShutdown) {
+            fileWriter.setStorageManager(this)
+            fileWriter.setShuffleKey(shuffleKey)
+          }
           deviceMonitor.registerFileWriter(fileWriter)
           val map = workingDirWriters.computeIfAbsent(dir, 
workingDirWriterListFunc)
           map.put(fileInfo.getFilePath, fileWriter)
@@ -456,7 +501,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     isHdfsExpired
   }
 
-  def cleanupExpiredShuffleKey(expiredShuffleKeys: util.HashSet[String]): Unit 
= {
+  def cleanupExpiredShuffleKey(
+      expiredShuffleKeys: util.HashSet[String],
+      cleanDB: Boolean = true): Unit = {
     expiredShuffleKeys.asScala.foreach { shuffleKey =>
       logInfo(s"Cleanup expired shuffle $shuffleKey.")
       if (fileInfos.containsKey(shuffleKey)) {
@@ -486,6 +533,12 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
             case e: Exception => logWarning("Clean expired HDFS shuffle 
failed.", e)
           }
         }
+        if (workerGracefulShutdown) {
+          committedFileInfos.remove(shuffleKey)
+          if (cleanDB) {
+            db.delete(dbShuffleKey(shuffleKey))
+          }
+        }
       }
     }
   }
@@ -607,7 +660,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     if (db != null) {
       if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
         try {
-          updateFileInfosInDB()
+          saveAllCommittedFileInfosToDB()
           db.close()
         } catch {
           case exception: Exception =>
@@ -622,7 +675,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
     }
     if (null != diskOperators) {
       if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
-        cleanupExpiredShuffleKey(shuffleKeySet())
+        cleanupExpiredShuffleKey(shuffleKeySet(), false)
       }
       ThreadUtils.parmap(
         diskOperators.asScala.toMap,
@@ -737,6 +790,13 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
         }
     }
   }
+
+  def notifyFileInfoCommitted(
+      shuffleKey: String,
+      fileName: String,
+      fileInfo: FileInfo): Unit = {
+    committedFileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName, 
fileInfo)
+  }
 }
 
 object StorageManager {

Reply via email to