This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 82f026f18 [CELEBORN-863] Persist committed file infos to support
worker recovery
82f026f18 is described below
commit 82f026f187d0a0b7d281386c53bb9ef278d8d53e
Author: mingji <[email protected]>
AuthorDate: Fri Aug 4 23:58:47 2023 +0800
[CELEBORN-863] Persist committed file infos to support worker recovery
### What changes were proposed in this pull request?
Support worker recovery if the worker has crashed when workers has enabled
graceful shutdown..
1. Persist committed file info to LevelDB.
2. Load levelDB when worker started.
3. Clean expired file infos in LevelDB.
### Why are the changes needed?
Ditto.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
GA and cluster. After testing on a cluster I found that 8k file infos will
consume about 2MB of disk space, disk space can be reclaimed if shuffle is
expired shortly.
Closes #1779 from FMX/CELEBORN-863.
Authored-by: mingji <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
(cherry picked from commit efc9a875e906b67930c2713bb6a979164984be24)
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 21 ++++++
docs/configuration/worker.md | 2 +
.../service/deploy/worker/storage/FileWriter.java | 16 +++++
.../deploy/worker/storage/StorageManager.scala | 80 +++++++++++++++++++---
4 files changed, 109 insertions(+), 10 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index c906a518b..98ede7709 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -920,6 +920,10 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def workerGracefulShutdownPartitionSorterCloseAwaitTimeMs: Long =
get(WORKER_PARTITION_SORTER_SHUTDOWN_TIMEOUT)
def workerGracefulShutdownFlusherShutdownTimeoutMs: Long =
get(WORKER_FLUSHER_SHUTDOWN_TIMEOUT)
+ def workerGracefulShutdownSaveCommittedFileInfoInterval: Long =
+ get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL)
+ def workerGracefulShutdownSaveCommittedFileInfoSync: Boolean =
+ get(WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC)
// //////////////////////////////////////////////////////
// Flusher //
@@ -2246,6 +2250,23 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("3s")
+ val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_INTERVAL:
ConfigEntry[Long] =
+
buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval")
+ .categories("worker")
+ .doc("Interval for a Celeborn worker to flush committed file infos into
Level DB.")
+ .version("0.3.1")
+ .timeConf(TimeUnit.MILLISECONDS)
+ .createWithDefaultString("5s")
+
+ val WORKER_GRACEFUL_SHUTDOWN_SAVE_COMMITTED_FILEINFO_SYNC:
ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync")
+ .categories("worker")
+ .doc(
+ "Whether to call sync method to save committed file infos into Level
DB to handle OS crash.")
+ .version("0.3.1")
+ .booleanConf
+ .createWithDefault(false)
+
val WORKER_DISKTIME_SLIDINGWINDOW_SIZE: ConfigEntry[Int] =
buildConf("celeborn.worker.flusher.diskTime.slidingWindow.size")
.withAlternative("celeborn.worker.flusher.avgFlushTime.slidingWindow.size")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index a76835b28..d57591485 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -56,6 +56,8 @@ license: |
| celeborn.worker.graceful.shutdown.enabled | false | When true, during worker
shutdown, the worker will wait for all released slots to be committed or
destroyed. | 0.2.0 |
| celeborn.worker.graceful.shutdown.partitionSorter.shutdownTimeout | 120s |
The wait time of waiting for sorting partition files during worker graceful
shutdown. | 0.2.0 |
| celeborn.worker.graceful.shutdown.recoverPath | <tmp>/recover | The
path to store levelDB. | 0.2.0 |
+| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.interval | 5s |
Interval for a Celeborn worker to flush committed file infos into Level DB. |
0.3.1 |
+| celeborn.worker.graceful.shutdown.saveCommittedFileInfo.sync | false |
Whether to call sync method to save committed file infos into Level DB to
handle OS crash. | 0.3.1 |
| celeborn.worker.graceful.shutdown.timeout | 600s | The worker's graceful
shutdown timeout time. | 0.2.0 |
| celeborn.worker.monitor.disk.check.interval | 60s | Intervals between device
monitor to check disk. | 0.3.0 |
| celeborn.worker.monitor.disk.check.timeout | 30s | Timeout time for worker
check device status. | 0.3.0 |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
index 06eeb0695..01f7d9705 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/FileWriter.java
@@ -81,6 +81,10 @@ public abstract class FileWriter implements DeviceObserver {
protected boolean deleted = false;
private RoaringBitmap mapIdBitMap = null;
protected final FlushNotifier notifier = new FlushNotifier();
+ // It's only needed when graceful shutdown is enabled
+ private String shuffleKey;
+ private StorageManager storageManager;
+ private boolean workerGracefulShutdown;
public FileWriter(
FileInfo fileInfo,
@@ -97,6 +101,7 @@ public abstract class FileWriter implements DeviceObserver {
this.flusher = flusher;
this.flushWorkerIndex = flusher.getWorkerIndex();
this.writerCloseTimeoutMs = conf.workerWriterCloseTimeoutMs();
+ this.workerGracefulShutdown = conf.workerGracefulShutdown();
this.splitThreshold = splitThreshold;
this.deviceMonitor = deviceMonitor;
this.splitMode = splitMode;
@@ -288,6 +293,9 @@ public abstract class FileWriter implements DeviceObserver {
deviceMonitor.unregisterFileWriter(this);
}
}
+ if (workerGracefulShutdown) {
+ storageManager.notifyFileInfoCommitted(shuffleKey, getFile().getName(),
fileInfo);
+ }
return fileInfo.getFileLength();
}
@@ -440,4 +448,12 @@ public abstract class FileWriter implements DeviceObserver
{
public PartitionType getPartitionType() {
return partitionType;
}
+
+ public void setShuffleKey(String shuffleKey) {
+ this.shuffleKey = shuffleKey;
+ }
+
+ public void setStorageManager(StorageManager storageManager) {
+ this.storageManager = storageManager;
+ }
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index e180a785f..a1bca599f 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -21,7 +21,7 @@ import java.io.{File, IOException}
import java.nio.charset.StandardCharsets
import java.nio.file.{FileAlreadyExistsException, Files, Paths}
import java.util
-import java.util.concurrent.{ConcurrentHashMap, ThreadPoolExecutor, TimeUnit}
+import java.util.concurrent.{ConcurrentHashMap, ScheduledExecutorService,
ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import java.util.function.{BiConsumer, IntUnaryOperator}
@@ -31,7 +31,7 @@ import scala.concurrent.duration._
import io.netty.buffer.PooledByteBufAllocator
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.fs.permission.FsPermission
-import org.iq80.leveldb.DB
+import org.iq80.leveldb.{DB, WriteOptions}
import org.apache.celeborn.common.CelebornConf
import org.apache.celeborn.common.exception.CelebornException
@@ -179,9 +179,16 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String,
FileInfo]]()
private val RECOVERY_FILE_NAME = "recovery.ldb"
private var db: DB = null
+ private var saveCommittedFileInfosExecutor: ScheduledExecutorService = _
+ private val saveCommittedFileInfoBySyncMethod =
+ conf.workerGracefulShutdownSaveCommittedFileInfoSync
+ private val saveCommittedFileInfoInterval =
+ conf.workerGracefulShutdownSaveCommittedFileInfoInterval
+ private var committedFileInfos: ConcurrentHashMap[String,
ConcurrentHashMap[String, FileInfo]] = _
// ShuffleClient can fetch data from a restarted worker only
// when the worker's fetching port is stable.
- if (conf.workerGracefulShutdown) {
+ val workerGracefulShutdown = conf.workerGracefulShutdown
+ if (workerGracefulShutdown) {
try {
val recoverFile = new File(conf.workerGracefulShutdownRecoverPath,
RECOVERY_FILE_NAME)
this.db = LevelDBProvider.initLevelDB(recoverFile, CURRENT_VERSION)
@@ -191,6 +198,28 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
logError("Init level DB failed:", e)
this.db = null
}
+ committedFileInfos =
+ JavaUtils.newConcurrentHashMap[String, ConcurrentHashMap[String,
FileInfo]]()
+ saveCommittedFileInfosExecutor =
+ ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+ "StorageManager-save-committed-fileinfo-thread")
+ saveCommittedFileInfosExecutor.scheduleAtFixedRate(
+ new Runnable {
+ override def run(): Unit = {
+ if (!committedFileInfos.isEmpty) {
+ logInfo(s"Save committed fileinfo with
${committedFileInfos.size()} shuffle keys")
+ committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
+ db.put(
+ dbShuffleKey(shuffleKey),
+ PbSerDeUtils.toPbFileInfoMap(files),
+ new WriteOptions().sync(saveCommittedFileInfoBySyncMethod))
+ }
+ }
+ }
+ },
+ saveCommittedFileInfoInterval,
+ saveCommittedFileInfoInterval,
+ TimeUnit.MILLISECONDS)
}
cleanupExpiredAppDirs()
if (!checkIfWorkingDirCleaned) {
@@ -228,10 +257,19 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
- def updateFileInfosInDB(): Unit = {
- fileInfos.asScala.foreach { case (shuffleKey, files) =>
+ def saveAllCommittedFileInfosToDB(): Unit = {
+ val runnables = saveCommittedFileInfosExecutor.shutdownNow()
+ // save committed fileinfo to DB should be done within the time of
saveCommittedFileInfoInterval
+ runnables.asScala.foreach(_.wait(saveCommittedFileInfoInterval))
+ // graceful shutdown might be timed out, persist all committed fileinfos
to levelDB
+ // final flush write through
+ committedFileInfos.asScala.foreach { case (shuffleKey, files) =>
try {
- db.put(dbShuffleKey(shuffleKey), PbSerDeUtils.toPbFileInfoMap(files))
+ db.put(
+ dbShuffleKey(shuffleKey),
+ PbSerDeUtils.toPbFileInfoMap(files),
+ // K8s container might gone
+ new WriteOptions().sync(true))
logDebug(s"Update FileInfos into DB: ${shuffleKey} -> ${files}")
} catch {
case exception: Exception =>
@@ -313,7 +351,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
rangeReadFilter)
case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
-
+ if (workerGracefulShutdown) {
+ hdfsWriter.setStorageManager(this)
+ hdfsWriter.setShuffleKey(shuffleKey)
+ }
fileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
hdfsWriters.put(fileInfo.getFilePath, hdfsWriter)
return hdfsWriter
@@ -357,6 +398,10 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
rangeReadFilter)
case _ => throw new UnsupportedOperationException(s"Not support
$partitionType yet")
}
+ if (workerGracefulShutdown) {
+ fileWriter.setStorageManager(this)
+ fileWriter.setShuffleKey(shuffleKey)
+ }
deviceMonitor.registerFileWriter(fileWriter)
val map = workingDirWriters.computeIfAbsent(dir,
workingDirWriterListFunc)
map.put(fileInfo.getFilePath, fileWriter)
@@ -456,7 +501,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
isHdfsExpired
}
- def cleanupExpiredShuffleKey(expiredShuffleKeys: util.HashSet[String]): Unit
= {
+ def cleanupExpiredShuffleKey(
+ expiredShuffleKeys: util.HashSet[String],
+ cleanDB: Boolean = true): Unit = {
expiredShuffleKeys.asScala.foreach { shuffleKey =>
logInfo(s"Cleanup expired shuffle $shuffleKey.")
if (fileInfos.containsKey(shuffleKey)) {
@@ -486,6 +533,12 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
case e: Exception => logWarning("Clean expired HDFS shuffle
failed.", e)
}
}
+ if (workerGracefulShutdown) {
+ committedFileInfos.remove(shuffleKey)
+ if (cleanDB) {
+ db.delete(dbShuffleKey(shuffleKey))
+ }
+ }
}
}
}
@@ -607,7 +660,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
if (db != null) {
if (exitKind == CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
try {
- updateFileInfosInDB()
+ saveAllCommittedFileInfosToDB()
db.close()
} catch {
case exception: Exception =>
@@ -622,7 +675,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
if (null != diskOperators) {
if (exitKind != CelebornExitKind.WORKER_GRACEFUL_SHUTDOWN) {
- cleanupExpiredShuffleKey(shuffleKeySet())
+ cleanupExpiredShuffleKey(shuffleKeySet(), false)
}
ThreadUtils.parmap(
diskOperators.asScala.toMap,
@@ -737,6 +790,13 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
}
}
}
+
+ def notifyFileInfoCommitted(
+ shuffleKey: String,
+ fileName: String,
+ fileInfo: FileInfo): Unit = {
+ committedFileInfos.computeIfAbsent(shuffleKey, newMapFunc).put(fileName,
fileInfo)
+ }
}
object StorageManager {