This is an automated email from the ASF dual-hosted git repository.
nicholasjiang pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 6fc556531 [CELEBORN-2291] Support fsync on commit to ensure shuffle
data durability
6fc556531 is described below
commit 6fc5565319c6622aef8637a5cf5fa348fc763745
Author: Kartikay Bhutani <[email protected]>
AuthorDate: Fri Mar 27 10:35:55 2026 +0800
[CELEBORN-2291] Support fsync on commit to ensure shuffle data durability
### What changes were proposed in this pull request?
Add a new configuration `celeborn.worker.commitFiles.fsync` (default
`false`) that calls `FileChannel.force(false)` (fdatasync) before closing the
channel in
`LocalTierWriter.closeStreams()`.
### Why are the changes needed?
Without this, committed shuffle data can sit in the OS page cache before
the kernel flushes it to disk. A hard crash in that window loses data even
though Celeborn considers it committed. This option lets operators opt into
stronger durability guarantees.
### Does this PR resolve a correctness bug?
No. It adds an optional durability enhancement.
### Does this PR introduce _any_ user-facing change?
Yes. New configuration key `celeborn.worker.commitFiles.fsync` (boolean,
default `false`).
### How was this patch tested?
Existing unit tests. Configuration verified via `ConfigurationSuite` and
for LocalTierWriter added a new test with fsync enabled and ran
`TierWriterSuite`.
Additional context:
[slack](https://apachecelebor-kw08030.slack.com/archives/C04B1FYS6SY/p1774259245973229)
Closes #3635 from kaybhutani/kartikay/fsync-on-commit.
Authored-by: Kartikay Bhutani <[email protected]>
Signed-off-by: 子懿 <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 11 +++++++++++
docs/configuration/worker.md | 1 +
.../service/deploy/worker/storage/TierWriter.scala | 11 ++++++++++-
.../deploy/worker/storage/TierWriterSuite.scala | 23 ++++++++++++++++++++--
4 files changed, 43 insertions(+), 3 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index b1a4a469e..a71723c9a 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -859,6 +859,7 @@ class CelebornConf(loadDefaults: Boolean) extends Cloneable
with Logging with Se
def workerCommitThreads: Int =
if (hasHDFSStorage) Math.max(128, get(WORKER_COMMIT_THREADS)) else
get(WORKER_COMMIT_THREADS)
def workerCommitFilesCheckInterval: Long =
get(WORKER_COMMIT_FILES_CHECK_INTERVAL)
+ def workerCommitFilesFsync: Boolean = get(WORKER_COMMIT_FILES_FSYNC)
def workerCleanThreads: Int = get(WORKER_CLEAN_THREADS)
def workerShuffleCommitTimeout: Long = get(WORKER_SHUFFLE_COMMIT_TIMEOUT)
def maxPartitionSizeToEstimate: Long =
@@ -3770,6 +3771,16 @@ object CelebornConf extends Logging {
.doc("Time length for a window about checking whether commit shuffle
data files finished.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("100")
+ val WORKER_COMMIT_FILES_FSYNC: ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.commitFiles.fsync")
+ .categories("worker")
+ .version("0.7.0")
+ .doc("Whether to fsync (fdatasync) shuffle data when committing. " +
+ "When enabled, each partition file is fsynced to disk before the
commit completes " +
+ "ensuring committed data survives OS crashes, hard reboots etc. " +
+ "Enabling ensures durability but can add some latency to commit
times.")
+ .booleanConf
+ .createWithDefault(false)
val WORKER_CLEAN_THREADS: ConfigEntry[Int] =
buildConf("celeborn.worker.clean.threads")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 0adfabe67..bb2cec89b 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -63,6 +63,7 @@ license: |
| celeborn.worker.clean.threads | 64 | false | Thread number of worker to
clean up expired shuffle keys. | 0.3.2 | |
| celeborn.worker.closeIdleConnections | false | false | Whether worker will
close idle connections. | 0.2.0 | |
| celeborn.worker.commitFiles.check.interval | 100 | false | Time length for a
window about checking whether commit shuffle data files finished. | 0.6.0 | |
+| celeborn.worker.commitFiles.fsync | false | false | Whether to fsync
(fdatasync) shuffle data when committing. When enabled, each partition file is
fsynced to disk before the commit completes ensuring committed data survives OS
crashes, hard reboots etc. Enabling ensures durability but can add some latency
to commit times. | 0.7.0 | |
| celeborn.worker.commitFiles.threads | 32 | false | Thread number of worker
to commit shuffle data files asynchronously. It's recommended to set at least
`128` when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 |
celeborn.worker.commit.threads |
| celeborn.worker.commitFiles.timeout | 120s | false | Timeout for a Celeborn
worker to commit files of a shuffle. It's recommended to set at least `240s`
when `HDFS` is enabled in `celeborn.storage.availableTypes`. | 0.3.0 |
celeborn.worker.shuffle.commit.timeout |
| celeborn.worker.congestionControl.check.interval | 10ms | false | Interval
of worker checks congestion if celeborn.worker.congestionControl.enabled is
true. | 0.3.2 | |
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 10bf94455..a04f1d676 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -418,6 +418,7 @@ class LocalTierWriter(
FileChannelUtils.createWritableFileChannel(diskFileInfo.getFilePath)
val gatherApiEnabled: Boolean = conf.workerFlusherLocalGatherAPIEnabled
+ val commitFilesFsync: Boolean = conf.workerCommitFilesFsync
override def needEvict(): Boolean = {
false
@@ -458,7 +459,15 @@ class LocalTierWriter(
}
override def closeStreams(): Unit = {
- channel.close()
+ if (channel != null) {
+ try {
+ if (commitFilesFsync) {
+ channel.force(false)
+ }
+ } finally {
+ channel.close()
+ }
+ }
}
override def notifyFileCommitted(): Unit =
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
index 43f999b29..ee6903ddf 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriterSuite.scala
@@ -175,8 +175,9 @@ class TierWriterSuite extends AnyFunSuite with
BeforeAndAfterEach {
}
- private def prepareLocalTierWriter(rangeFilter: Boolean): LocalTierWriter = {
- val celebornConf = new CelebornConf()
+ private def prepareLocalTierWriter(
+ rangeFilter: Boolean,
+ celebornConf: CelebornConf = new CelebornConf()): LocalTierWriter = {
celebornConf.set("celeborn.worker.memoryFileStorage.maxFileSize", "80k")
celebornConf.set("celeborn.client.shuffle.rangeReadFilter.enabled",
rangeFilter.toString)
val reduceFileMeta = new ReduceFileMeta(celebornConf.shuffleChunkSize)
@@ -314,4 +315,22 @@ class TierWriterSuite extends AnyFunSuite with
BeforeAndAfterEach {
localTierWriter.fileInfo.getFileMeta.asInstanceOf[ReduceFileMeta].getLastChunkOffset
== 10240)
}
+
+ test("test local tier writer with fsync enabled") {
+ val conf = new CelebornConf()
+ conf.set("celeborn.worker.commitFiles.fsync", "true")
+ val localTierWriter = prepareLocalTierWriter(false, conf)
+
+ assert(localTierWriter.commitFilesFsync === true)
+ for (i <- 1 to 10) {
+ localTierWriter.numPendingWrites.incrementAndGet()
+ localTierWriter.write(WriterUtils.generateSparkFormatData(
+ UnpooledByteBufAllocator.DEFAULT,
+ 0))
+ }
+
+ val fileLen = localTierWriter.close()
+ assert(fileLen == 10240)
+ assert(localTierWriter.closed === true)
+ }
}