This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.6 by this push:
     new 1641e4dd2 [CELEBORN-2192] ReadBufferDispatcher should add timeout 
constraints to fast fail in case of timeout
1641e4dd2 is described below

commit 1641e4dd267d45b34d4d323924c9fb22d6557313
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 5 10:24:38 2025 +0800

    [CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to fast 
fail in case of timeout
    
    `ReadBufferDispatcher` should add timeout constraints to fast fail in case 
of timeout.
    
    Setting `celeborn.worker.directMemoryRatioForReadBuffer` with small ratio 
may result in a backlog of read buffer requests for `ReadBufferDispatcher` at 
present, which causes running flink jobs to stall. `ReadBufferDispatcher` 
should add timeout constraints to fast fail in case of timeout, avoiding long 
wait times for client.
    
    No.
    
    Introduce `celeborn.worker.readBuffer.processTimeout` config to specify 
timeout for buffer dispatcher to process a read buffer request.
    
    `ReadBufferDispactherSuite#[CELEBORN-2192] ReadBufferDispatcher should add 
timeout constraints to fast fail in case of timeout`
    
    Closes #3525 from SteNicholas/CELEBORN-2192.
    
    Lead-authored-by: SteNicholas <[email protected]>
    Co-authored-by: 子懿 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
    (cherry picked from commit 76199454ca28f1d0f390b06afc2bc87e61a3b3ed)
    Signed-off-by: Shuang <[email protected]>
---
 .../org/apache/celeborn/common/CelebornConf.scala  |  9 +++++
 docs/configuration/worker.md                       |  1 +
 .../deploy/worker/memory/ReadBufferDispatcher.java | 40 +++++++++++++++++-----
 .../deploy/memory/ReadBufferDispactherSuite.scala  | 36 ++++++++++++++++---
 4 files changed, 74 insertions(+), 12 deletions(-)

diff --git 
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala 
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 2189b0a79..3345792a5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1277,6 +1277,7 @@ class CelebornConf(loadDefaults: Boolean) extends 
Cloneable with Logging with Se
   def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
   def partitionReadBuffersMax: Int = get(WORKER_PARTITION_READ_BUFFERS_MAX)
   def readBufferAllocationWait: Long = get(WORKER_READBUFFER_ALLOCATIONWAIT)
+  def readBufferProcessTimeout: Long = get(WORKER_READBUFFER_PROCESS_TIMEOUT)
   def readBufferTargetRatio: Double = get(WORKER_READBUFFER_TARGET_RATIO)
   def readBufferTargetUpdateInterval: Long = 
get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
   def readBufferTargetNotifyThreshold: Long = 
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
@@ -4333,6 +4334,14 @@ object CelebornConf extends Logging {
       .timeConf(TimeUnit.MILLISECONDS)
       .createWithDefaultString("50ms")
 
+  val WORKER_READBUFFER_PROCESS_TIMEOUT: ConfigEntry[Long] =
+    buildConf("celeborn.worker.readBuffer.processTimeout")
+      .categories("worker")
+      .version("0.6.2")
+      .doc("Timeout for buffer dispatcher to process a read buffer request.")
+      .timeConf(TimeUnit.NANOSECONDS)
+      .createWithDefaultString("600s")
+
   val WORKER_READBUFFER_TARGET_RATIO: ConfigEntry[Double] =
     buildConf("celeborn.worker.readBuffer.target.ratio")
       .categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 0c623fccf..be6afdff8 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -162,6 +162,7 @@ license: |
 | celeborn.worker.push.io.threads | &lt;undefined&gt; | false | Netty IO 
thread number of worker to handle client push data. The default threads number 
is the number of flush thread. | 0.2.0 |  | 
 | celeborn.worker.push.port | 0 | false | Server port for Worker to receive 
push data request from ShuffleClient. | 0.2.0 |  | 
 | celeborn.worker.readBuffer.allocationWait | 50ms | false | The time to wait 
when buffer dispatcher can not allocate a buffer. | 0.3.0 |  | 
+| celeborn.worker.readBuffer.processTimeout | 600s | false | Timeout for 
buffer dispatcher to process a read buffer request. | 0.6.2 |  | 
 | celeborn.worker.readBuffer.target.changeThreshold | 1mb | false | The target 
ratio for pre read memory usage. | 0.3.0 |  | 
 | celeborn.worker.readBuffer.target.ratio | 0.9 | false | The target ratio for 
read ahead buffer's memory usage. | 0.3.0 |  | 
 | celeborn.worker.readBuffer.target.updateInterval | 100ms | false | The 
interval for memory manager to calculate new read buffer's target memory. | 
0.3.0 |  | 
diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index aa862f57b..b5f3c8bca 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.atomic.LongAdder;
 
@@ -45,12 +46,15 @@ public class ReadBufferDispatcher {
   private final ByteBufAllocator readBufferAllocator;
   private final LongAdder allocatedReadBuffers = new LongAdder();
   private final long readBufferAllocationWait;
+  private final long readBufferProcessTimeout;
   @VisibleForTesting public volatile boolean stopFlag = false;
-  @VisibleForTesting public final AtomicReference<Thread> dispatcherThread;
+  @VisibleForTesting public AtomicReference<Thread> dispatcherThread;
+  private ScheduledExecutorService checkAliveThread;
 
   public ReadBufferDispatcher(
       MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
     readBufferAllocationWait = conf.readBufferAllocationWait();
+    readBufferProcessTimeout = conf.readBufferProcessTimeout();
     long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
     // readBuffer is not a module name, it's a placeholder.
     readBufferAllocator =
@@ -62,7 +66,7 @@ public class ReadBufferDispatcher {
     dispatcherThread.get().start();
 
     if (checkThreadInterval > 0) {
-      ScheduledExecutorService checkAliveThread =
+      checkAliveThread =
           
ThreadUtils.newDaemonSingleThreadScheduledExecutor("read-buffer-dispatcher-checker");
       checkAliveThread.scheduleWithFixedDelay(
           () -> {
@@ -104,6 +108,14 @@ public class ReadBufferDispatcher {
   public void close() {
     stopFlag = true;
     requests.clear();
+    if (dispatcherThread != null) {
+      dispatcherThread.get().interrupt();
+      dispatcherThread = null;
+    }
+    if (checkAliveThread != null) {
+      ThreadUtils.shutdown(checkAliveThread);
+      checkAliveThread = null;
+    }
   }
 
   private class DispatcherRunnable implements Runnable {
@@ -144,10 +156,11 @@ public class ReadBufferDispatcher {
       }
     }
 
-    void processBufferRequest(ReadBufferRequest request, List<ByteBuf> 
buffers) {
+    void processBufferRequest(ReadBufferRequest request, List<ByteBuf> 
buffers) throws Exception {
       long start = System.nanoTime();
+      int number = request.getNumber();
       int bufferSize = request.getBufferSize();
-      while (buffers.size() < request.getNumber()) {
+      while (buffers.size() < number && System.nanoTime() - start <= 
readBufferProcessTimeout) {
         if (memoryManager.readBufferAvailable(bufferSize)) {
           ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
           buffers.add(buf);
@@ -163,10 +176,21 @@ public class ReadBufferDispatcher {
           }
         }
       }
-      long end = System.nanoTime();
-      logger.debug(
-          "process read buffer request using {} ms", 
TimeUnit.NANOSECONDS.toMillis(end - start));
-      request.getBufferListener().notifyBuffers(buffers, null);
+      if (buffers.size() == number) {
+        logger.debug(
+            "Processing {} read buffer (size: {}) request uses {} ms.",
+            number,
+            bufferSize,
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+        request.getBufferListener().notifyBuffers(buffers, null);
+      } else {
+        // The buffer dispatcher should add timeout constraints to fast fail 
in case of timeout,
+        // avoiding long wait times for client.
+        throw new TimeoutException(
+            String.format(
+                "Process %d read buffer (size: %d) request timeout after %d 
ms.",
+                number, bufferSize, 
TimeUnit.NANOSECONDS.toMillis(readBufferProcessTimeout)));
+      }
     }
   }
 }
diff --git 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
index f2e3ade15..b019a60f1 100644
--- 
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
+++ 
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
@@ -18,7 +18,7 @@
 package org.apache.celeborn.service.deploy.memory
 
 import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.{CompletableFuture, TimeoutException, TimeUnit}
 
 import io.netty.buffer.ByteBuf
 import org.mockito.ArgumentMatchers.anyInt
@@ -35,8 +35,8 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
   test("[CELEBORN-1580] Test ReadBufferDispacther notify exception to 
listener") {
     val mockedMemoryManager = mock(classOf[MemoryManager])
     when(mockedMemoryManager.readBufferAvailable(anyInt())).thenAnswer(
-      new Answer[Int] {
-        override def answer(invocation: InvocationOnMock): Int = {
+      new Answer[Boolean] {
+        override def answer(invocation: InvocationOnMock): Boolean = {
           throw new RuntimeException("throw exception for test")
         }
       })
@@ -66,7 +66,7 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
   test("Test check thread alive") {
     val mockedMemoryManager = mock(classOf[MemoryManager])
     val conf = new CelebornConf()
-    
conf.set("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval", 
"100ms")
+    conf.set(CelebornConf.WORKER_READBUFFER_CHECK_THREAD_INTERVAL.key, "100ms")
     val readBufferDispatcher = new ReadBufferDispatcher(mockedMemoryManager, 
conf, null)
     val threadId1 = readBufferDispatcher.dispatcherThread.get().getId
     readBufferDispatcher.stopFlag = true
@@ -75,4 +75,32 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
     val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
     assert(threadId1 != threadId2)
   }
+
+  test("[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to 
fast fail in case of timeout") {
+    val memoryManager = mock(classOf[MemoryManager])
+    val readBufferDispatcher = new ReadBufferDispatcher(
+      memoryManager,
+      new CelebornConf().set(
+        CelebornConf.WORKER_READBUFFER_PROCESS_TIMEOUT.key,
+        CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT.defaultValueString),
+      null)
+    when(memoryManager.readBufferAvailable(anyInt())).thenAnswer(new 
Answer[Boolean] {
+      override def answer(invocationOnMock: InvocationOnMock): Boolean = false
+    })
+    val completableFuture = new CompletableFuture[Void]()
+    val readBufferRequest = new ReadBufferRequest(
+      Integer.MAX_VALUE,
+      Integer.MAX_VALUE,
+      new ReadBufferListener {
+        override def notifyBuffers(
+            allocatedBuffers: util.List[ByteBuf],
+            throwable: Throwable): Unit = {
+          assert(throwable != null)
+          assert(throwable.isInstanceOf[TimeoutException])
+          completableFuture.complete(null);
+        }
+      })
+    readBufferDispatcher.addBufferRequest(readBufferRequest)
+    completableFuture.get(5, TimeUnit.SECONDS)
+  }
 }

Reply via email to