This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new b8bb6c1e6 [CELEBORN-1931] use gather API for local flusher to optimize
write io pattern
b8bb6c1e6 is described below
commit b8bb6c1e6b5c77ca78e5e5d80826dfd488c419f3
Author: mingji <[email protected]>
AuthorDate: Mon Apr 21 13:57:40 2025 +0800
[CELEBORN-1931] use gather API for local flusher to optimize write io
pattern
### What changes were proposed in this pull request?
To optimize the Celeborn worker flusher IO pattern.
### Why are the changes needed?
Celeborn bytebuf has many small parts to write. Gather API can reduce the
need to traverse the small buffers.
### Does this PR introduce _any_ user-facing change?
NO.
### How was this patch tested?
Cluster tests.
Closes #3178 from FMX/b1931.
Authored-by: mingji <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 10 ++++++++++
.../service/deploy/worker/storage/FlushTask.scala | 18 +++++++++++++-----
.../service/deploy/worker/storage/TierWriter.scala | 4 +++-
3 files changed, 26 insertions(+), 6 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 59a085af0..44d76a1de 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1296,6 +1296,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
def workerCreateWriterMaxAttempts: Int =
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
+ def workerFlusherLocalGatherAPIEnabled: Boolean =
get(WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED)
// //////////////////////////////////////////////////////
// Disk Monitor //
@@ -3974,6 +3975,15 @@ object CelebornConf extends Logging {
.intConf
.createWithDefault(3)
+ val WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED: ConfigEntry[Boolean] =
+ buildConf("celeborn.worker.flusher.local.gatherAPI.enabled")
+ .internal
+ .categories("worker")
+ .version("0.6.0")
+ .doc("Worker will use gather API if this config is true.")
+ .booleanConf
+ .createWithDefault(true)
+
val WORKER_PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD:
ConfigEntry[Double] =
buildConf("celeborn.worker.partitionSorter.directMemoryRatioThreshold")
.categories("worker")
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 736e1ff4a..16a11c088 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -37,15 +37,23 @@ private[worker] class LocalFlushTask(
buffer: CompositeByteBuf,
fileChannel: FileChannel,
notifier: FlushNotifier,
- keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
+ keepBuffer: Boolean,
+ gatherApiEnabled: Boolean) extends FlushTask(buffer, notifier, keepBuffer)
{
override def flush(): Unit = {
val buffers = buffer.nioBuffers()
- for (buffer <- buffers) {
- while (buffer.hasRemaining) {
- fileChannel.write(buffer)
+ if (gatherApiEnabled) {
+ val readableBytes = buffer.readableBytes()
+ var written = 0L
+ do {
+ written = fileChannel.write(buffers) + written
+ } while (written != readableBytes)
+ } else {
+ for (buffer <- buffers) {
+ while (buffer.hasRemaining) {
+ fileChannel.write(buffer)
+ }
}
}
-
// TODO: force flush file channel in scenarios where the upstream task
writes and the downstream task reads simultaneously, such as flink hybrid
shuffle.
}
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 8ac98aae8..ab7c39a66 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -395,13 +395,15 @@ class LocalTierWriter(
private lazy val channel: FileChannel =
FileChannelUtils.createWritableFileChannel(diskFileInfo.getFilePath)
+ val gatherApiEnabled: Boolean = conf.workerFlusherLocalGatherAPIEnabled
+
override def needEvict(): Boolean = {
false
}
override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean):
FlushTask = {
notifier.numPendingFlushes.incrementAndGet()
- new LocalFlushTask(flushBuffer, channel, notifier, true)
+ new LocalFlushTask(flushBuffer, channel, notifier, true, gatherApiEnabled)
}
override def writeInternal(buf: ByteBuf): Unit = {