This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.6
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.6 by this push:
new 1641e4dd2 [CELEBORN-2192] ReadBufferDispatcher should add timeout
constraints to fast fail in case of timeout
1641e4dd2 is described below
commit 1641e4dd267d45b34d4d323924c9fb22d6557313
Author: SteNicholas <[email protected]>
AuthorDate: Wed Nov 5 10:24:38 2025 +0800
[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to fast
fail in case of timeout
`ReadBufferDispatcher` should add timeout constraints to fast fail in case
of timeout.
Setting `celeborn.worker.directMemoryRatioForReadBuffer` with small ratio
may result in a backlog of read buffer requests for `ReadBufferDispatcher` at
present, which causes running flink jobs to stall. `ReadBufferDispatcher`
should add timeout constraints to fast fail in case of timeout, avoiding long
wait times for client.
No.
Introduce `celeborn.worker.readBuffer.processTimeout` config to specify
timeout for buffer dispatcher to process a read buffer request.
`ReadBufferDispactherSuite#[CELEBORN-2192] ReadBufferDispatcher should add
timeout constraints to fast fail in case of timeout`
Closes #3525 from SteNicholas/CELEBORN-2192.
Lead-authored-by: SteNicholas <[email protected]>
Co-authored-by: 子懿 <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 76199454ca28f1d0f390b06afc2bc87e61a3b3ed)
Signed-off-by: Shuang <[email protected]>
---
.../org/apache/celeborn/common/CelebornConf.scala | 9 +++++
docs/configuration/worker.md | 1 +
.../deploy/worker/memory/ReadBufferDispatcher.java | 40 +++++++++++++++++-----
.../deploy/memory/ReadBufferDispactherSuite.scala | 36 ++++++++++++++++---
4 files changed, 74 insertions(+), 12 deletions(-)
diff --git
a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
index 2189b0a79..3345792a5 100644
--- a/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
+++ b/common/src/main/scala/org/apache/celeborn/common/CelebornConf.scala
@@ -1277,6 +1277,7 @@ class CelebornConf(loadDefaults: Boolean) extends
Cloneable with Logging with Se
def partitionReadBuffersMin: Int = get(WORKER_PARTITION_READ_BUFFERS_MIN)
def partitionReadBuffersMax: Int = get(WORKER_PARTITION_READ_BUFFERS_MAX)
def readBufferAllocationWait: Long = get(WORKER_READBUFFER_ALLOCATIONWAIT)
+ def readBufferProcessTimeout: Long = get(WORKER_READBUFFER_PROCESS_TIMEOUT)
def readBufferTargetRatio: Double = get(WORKER_READBUFFER_TARGET_RATIO)
def readBufferTargetUpdateInterval: Long =
get(WORKER_READBUFFER_TARGET_UPDATE_INTERVAL)
def readBufferTargetNotifyThreshold: Long =
get(WORKER_READBUFFER_TARGET_NOTIFY_THRESHOLD)
@@ -4333,6 +4334,14 @@ object CelebornConf extends Logging {
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("50ms")
+ val WORKER_READBUFFER_PROCESS_TIMEOUT: ConfigEntry[Long] =
+ buildConf("celeborn.worker.readBuffer.processTimeout")
+ .categories("worker")
+ .version("0.6.2")
+ .doc("Timeout for buffer dispatcher to process a read buffer request.")
+ .timeConf(TimeUnit.NANOSECONDS)
+ .createWithDefaultString("600s")
+
val WORKER_READBUFFER_TARGET_RATIO: ConfigEntry[Double] =
buildConf("celeborn.worker.readBuffer.target.ratio")
.categories("worker")
diff --git a/docs/configuration/worker.md b/docs/configuration/worker.md
index 0c623fccf..be6afdff8 100644
--- a/docs/configuration/worker.md
+++ b/docs/configuration/worker.md
@@ -162,6 +162,7 @@ license: |
| celeborn.worker.push.io.threads | <undefined> | false | Netty IO
thread number of worker to handle client push data. The default threads number
is the number of flush thread. | 0.2.0 | |
| celeborn.worker.push.port | 0 | false | Server port for Worker to receive
push data request from ShuffleClient. | 0.2.0 | |
| celeborn.worker.readBuffer.allocationWait | 50ms | false | The time to wait
when buffer dispatcher can not allocate a buffer. | 0.3.0 | |
+| celeborn.worker.readBuffer.processTimeout | 600s | false | Timeout for
buffer dispatcher to process a read buffer request. | 0.6.2 | |
| celeborn.worker.readBuffer.target.changeThreshold | 1mb | false | The target
ratio for pre read memory usage. | 0.3.0 | |
| celeborn.worker.readBuffer.target.ratio | 0.9 | false | The target ratio for
read ahead buffer's memory usage. | 0.3.0 | |
| celeborn.worker.readBuffer.target.updateInterval | 100ms | false | The
interval for memory manager to calculate new read buffer's target memory. |
0.3.0 | |
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index aa862f57b..b5f3c8bca 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -22,6 +22,7 @@ import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
@@ -45,12 +46,15 @@ public class ReadBufferDispatcher {
private final ByteBufAllocator readBufferAllocator;
private final LongAdder allocatedReadBuffers = new LongAdder();
private final long readBufferAllocationWait;
+ private final long readBufferProcessTimeout;
@VisibleForTesting public volatile boolean stopFlag = false;
- @VisibleForTesting public final AtomicReference<Thread> dispatcherThread;
+ @VisibleForTesting public AtomicReference<Thread> dispatcherThread;
+ private ScheduledExecutorService checkAliveThread;
public ReadBufferDispatcher(
MemoryManager memoryManager, CelebornConf conf, AbstractSource source) {
readBufferAllocationWait = conf.readBufferAllocationWait();
+ readBufferProcessTimeout = conf.readBufferProcessTimeout();
long checkThreadInterval = conf.readBufferDispatcherCheckThreadInterval();
// readBuffer is not a module name, it's a placeholder.
readBufferAllocator =
@@ -62,7 +66,7 @@ public class ReadBufferDispatcher {
dispatcherThread.get().start();
if (checkThreadInterval > 0) {
- ScheduledExecutorService checkAliveThread =
+ checkAliveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("read-buffer-dispatcher-checker");
checkAliveThread.scheduleWithFixedDelay(
() -> {
@@ -104,6 +108,14 @@ public class ReadBufferDispatcher {
public void close() {
stopFlag = true;
requests.clear();
+ if (dispatcherThread != null) {
+ dispatcherThread.get().interrupt();
+ dispatcherThread = null;
+ }
+ if (checkAliveThread != null) {
+ ThreadUtils.shutdown(checkAliveThread);
+ checkAliveThread = null;
+ }
}
private class DispatcherRunnable implements Runnable {
@@ -144,10 +156,11 @@ public class ReadBufferDispatcher {
}
}
- void processBufferRequest(ReadBufferRequest request, List<ByteBuf>
buffers) {
+ void processBufferRequest(ReadBufferRequest request, List<ByteBuf>
buffers) throws Exception {
long start = System.nanoTime();
+ int number = request.getNumber();
int bufferSize = request.getBufferSize();
- while (buffers.size() < request.getNumber()) {
+ while (buffers.size() < number && System.nanoTime() - start <=
readBufferProcessTimeout) {
if (memoryManager.readBufferAvailable(bufferSize)) {
ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
buffers.add(buf);
@@ -163,10 +176,21 @@ public class ReadBufferDispatcher {
}
}
}
- long end = System.nanoTime();
- logger.debug(
- "process read buffer request using {} ms",
TimeUnit.NANOSECONDS.toMillis(end - start));
- request.getBufferListener().notifyBuffers(buffers, null);
+ if (buffers.size() == number) {
+ logger.debug(
+ "Processing {} read buffer (size: {}) request uses {} ms.",
+ number,
+ bufferSize,
+ TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start));
+ request.getBufferListener().notifyBuffers(buffers, null);
+ } else {
+ // The buffer dispatcher should add timeout constraints to fast fail
in case of timeout,
+ // avoiding long wait times for client.
+ throw new TimeoutException(
+ String.format(
+ "Process %d read buffer (size: %d) request timeout after %d
ms.",
+ number, bufferSize,
TimeUnit.NANOSECONDS.toMillis(readBufferProcessTimeout)));
+ }
}
}
}
diff --git
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
index f2e3ade15..b019a60f1 100644
---
a/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
+++
b/worker/src/test/scala/org/apache/celeborn/service/deploy/memory/ReadBufferDispactherSuite.scala
@@ -18,7 +18,7 @@
package org.apache.celeborn.service.deploy.memory
import java.util
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.{CompletableFuture, TimeoutException, TimeUnit}
import io.netty.buffer.ByteBuf
import org.mockito.ArgumentMatchers.anyInt
@@ -35,8 +35,8 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
test("[CELEBORN-1580] Test ReadBufferDispacther notify exception to
listener") {
val mockedMemoryManager = mock(classOf[MemoryManager])
when(mockedMemoryManager.readBufferAvailable(anyInt())).thenAnswer(
- new Answer[Int] {
- override def answer(invocation: InvocationOnMock): Int = {
+ new Answer[Boolean] {
+ override def answer(invocation: InvocationOnMock): Boolean = {
throw new RuntimeException("throw exception for test")
}
})
@@ -66,7 +66,7 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
test("Test check thread alive") {
val mockedMemoryManager = mock(classOf[MemoryManager])
val conf = new CelebornConf()
-
conf.set("celeborn.worker.readBufferDispatcherThreadWatchdog.checkInterval",
"100ms")
+ conf.set(CelebornConf.WORKER_READBUFFER_CHECK_THREAD_INTERVAL.key, "100ms")
val readBufferDispatcher = new ReadBufferDispatcher(mockedMemoryManager,
conf, null)
val threadId1 = readBufferDispatcher.dispatcherThread.get().getId
readBufferDispatcher.stopFlag = true
@@ -75,4 +75,32 @@ class ReadBufferDispactherSuite extends CelebornFunSuite {
val threadId2 = readBufferDispatcher.dispatcherThread.get().getId
assert(threadId1 != threadId2)
}
+
+ test("[CELEBORN-2192] ReadBufferDispatcher should add timeout constraints to
fast fail in case of timeout") {
+ val memoryManager = mock(classOf[MemoryManager])
+ val readBufferDispatcher = new ReadBufferDispatcher(
+ memoryManager,
+ new CelebornConf().set(
+ CelebornConf.WORKER_READBUFFER_PROCESS_TIMEOUT.key,
+ CelebornConf.WORKER_READBUFFER_ALLOCATIONWAIT.defaultValueString),
+ null)
+ when(memoryManager.readBufferAvailable(anyInt())).thenAnswer(new
Answer[Boolean] {
+ override def answer(invocationOnMock: InvocationOnMock): Boolean = false
+ })
+ val completableFuture = new CompletableFuture[Void]()
+ val readBufferRequest = new ReadBufferRequest(
+ Integer.MAX_VALUE,
+ Integer.MAX_VALUE,
+ new ReadBufferListener {
+ override def notifyBuffers(
+ allocatedBuffers: util.List[ByteBuf],
+ throwable: Throwable): Unit = {
+ assert(throwable != null)
+ assert(throwable.isInstanceOf[TimeoutException])
+ completableFuture.complete(null);
+ }
+ })
+ readBufferDispatcher.addBufferRequest(readBufferRequest)
+ completableFuture.get(5, TimeUnit.SECONDS)
+ }
}