This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new b8bb6c1e6 [CELEBORN-1931] use gather API for local flusher to optimize 
write io pattern
b8bb6c1e6 is described below

commit b8bb6c1e6b5c77ca78e5e5d80826dfd488c419f3
Author: mingji <[email protected]>
AuthorDate: Mon Apr 21 13:57:40 2025 +0800

    [CELEBORN-1931] use gather API for local flusher to optimize write io 
pattern
    
    ### What changes were proposed in this pull request?
    To optimize the Celeborn worker flusher IO pattern.
    
    ### Why are the changes needed?
    Celeborn bytebuf has many small parts to write. Gather API can reduce the 
need to traverse the small buffers.
    
    ### Does this PR introduce _any_ user-facing change?
    NO.
    
    ### How was this patch tested?
    Cluster tests.
    
    Closes #3178 from FMX/b1931.
    
    Authored-by: mingji <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala      | 10 ++++++++++
 .../service/deploy/worker/storage/FlushTask.scala      | 18 +++++++++++++-----
 .../service/deploy/worker/storage/TierWriter.scala     |  4 +++-
 3 files changed, 26 insertions(+), 6 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 59a085af0..44d76a1de 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1296,6 +1296,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def workerS3FlusherThreads: Int = get(WORKER_FLUSHER_S3_THREADS)
   def workerOssFlusherThreads: Int = get(WORKER_FLUSHER_OSS_THREADS)
   def workerCreateWriterMaxAttempts: Int = 
get(WORKER_WRITER_CREATE_MAX_ATTEMPTS)
+  def workerFlusherLocalGatherAPIEnabled: Boolean = 
get(WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED)
 
   // //////////////////////////////////////////////////////
   //                    Disk Monitor                     //
@@ -3974,6 +3975,15 @@ object CelebornConf extends Logging {
       .intConf
       .createWithDefault(3)
 
+  val WORKER_FLUSHER_LOCAL_GATHER_API_ENABLED: ConfigEntry[Boolean] =
+    buildConf("celeborn.worker.flusher.local.gatherAPI.enabled")
+      .internal
+      .categories("worker")
+      .version("0.6.0")
+      .doc("Worker will use gather API if this config is true.")
+      .booleanConf
+      .createWithDefault(true)
+
   val WORKER_PARTITION_SORTER_DIRECT_MEMORY_RATIO_THRESHOLD: 
ConfigEntry[Double] =
     buildConf("celeborn.worker.partitionSorter.directMemoryRatioThreshold")
       .categories("worker")
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
index 736e1ff4a..16a11c088 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/FlushTask.scala
@@ -37,15 +37,23 @@ private[worker] class LocalFlushTask(
     buffer: CompositeByteBuf,
     fileChannel: FileChannel,
     notifier: FlushNotifier,
-    keepBuffer: Boolean) extends FlushTask(buffer, notifier, keepBuffer) {
+    keepBuffer: Boolean,
+    gatherApiEnabled: Boolean) extends FlushTask(buffer, notifier, keepBuffer) 
{
   override def flush(): Unit = {
     val buffers = buffer.nioBuffers()
-    for (buffer <- buffers) {
-      while (buffer.hasRemaining) {
-        fileChannel.write(buffer)
+    if (gatherApiEnabled) {
+      val readableBytes = buffer.readableBytes()
+      var written = 0L
+      do {
+        written = fileChannel.write(buffers) + written
+      } while (written != readableBytes)
+    } else {
+      for (buffer <- buffers) {
+        while (buffer.hasRemaining) {
+          fileChannel.write(buffer)
+        }
       }
     }
-
     // TODO: force flush file channel in scenarios where the upstream task 
writes and the downstream task reads simultaneously, such as flink hybrid 
shuffle.
   }
 }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 8ac98aae8..ab7c39a66 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -395,13 +395,15 @@ class LocalTierWriter(
   private lazy val channel: FileChannel =
     FileChannelUtils.createWritableFileChannel(diskFileInfo.getFilePath)
 
+  val gatherApiEnabled: Boolean = conf.workerFlusherLocalGatherAPIEnabled
+
   override def needEvict(): Boolean = {
     false
   }
 
   override def genFlushTask(finalFlush: Boolean, keepBuffer: Boolean): 
FlushTask = {
     notifier.numPendingFlushes.incrementAndGet()
-    new LocalFlushTask(flushBuffer, channel, notifier, true)
+    new LocalFlushTask(flushBuffer, channel, notifier, true, gatherApiEnabled)
   }
 
   override def writeInternal(buf: ByteBuf): Unit = {

Reply via email to