This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 0e3bc383a [CELEBORN-1379] Catch Throwable for ReadBufferDispatcher
thread
0e3bc383a is described below
commit 0e3bc383a5df3dd91c5336742755e182d052049a
Author: lvshuang.xjs <[email protected]>
AuthorDate: Thu Apr 11 16:07:04 2024 +0800
[CELEBORN-1379] Catch Throwable for ReadBufferDispatcher thread
### What changes were proposed in this pull request?
Catch throwable and release unused buffers.
### Why are the changes needed?
When the BufferAllocator tries to allocate a new ByteBuf, it may throw an
OutOfDirectMemoryError. We should catch Throwable in ReadBufferDispatcher to
avoid the thread from exiting.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Pass GA
Closes #2452 from RexXiong/CELEBORN-1379.
Lead-authored-by: lvshuang.xjs <[email protected]>
Co-authored-by: Shuang <[email protected]>
Signed-off-by: mingji <[email protected]>
---
.../deploy/worker/memory/ReadBufferDispatcher.java | 58 +++++++++++++---------
1 file changed, 34 insertions(+), 24 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index 1ca32b620..17bbaa3b8 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -76,33 +76,43 @@ public class ReadBufferDispatcher extends Thread {
logger.info("Buffer dispatcher is closing");
}
- if (request != null) {
- long start = System.nanoTime();
- List<ByteBuf> buffers = new ArrayList<>();
- int bufferSize = request.getBufferSize();
- while (buffers.size() < request.getNumber()) {
- if (memoryManager.readBufferAvailable(bufferSize)) {
- memoryManager.changeReadBufferCounter(bufferSize);
- ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
- buffers.add(buf);
- allocatedReadBuffers.increment();
- } else {
- try {
- // If dispatcher can not allocate requested buffers, it will
wait here until necessary
- // buffers are get.
- Thread.sleep(this.readBufferAllocationWait);
- } catch (InterruptedException e) {
- logger.info("Buffer dispatcher is closing");
+ List<ByteBuf> buffers = null;
+ try {
+ if (request != null) {
+ long start = System.nanoTime();
+ int bufferSize = request.getBufferSize();
+ buffers = new ArrayList<>();
+ while (buffers.size() < request.getNumber()) {
+ if (memoryManager.readBufferAvailable(bufferSize)) {
+ ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
+ buffers.add(buf);
+ memoryManager.changeReadBufferCounter(bufferSize);
+ allocatedReadBuffers.increment();
+ } else {
+ try {
+ // If dispatcher can not allocate requested buffers, it will
wait here until
+ // necessary buffers are get.
+ Thread.sleep(this.readBufferAllocationWait);
+ } catch (InterruptedException e) {
+ logger.info("Buffer dispatcher is closing");
+ }
}
}
+ long end = System.nanoTime();
+ logger.debug(
+ "process read buffer request using {} ms",
+ TimeUnit.NANOSECONDS.toMillis(end - start));
+ request.getBufferListener().notifyBuffers(buffers, null);
+ } else {
+ // Free buffer pool memory to main direct memory when dispatcher is
idle.
+ readBufferAllocator.trimCurrentThreadCache();
+ }
+ } catch (Throwable e) {
+ logger.error(e.getMessage(), e);
+ // recycle all allocated buffers
+ if (buffers != null) {
+ buffers.forEach(this::recycle);
}
- long end = System.nanoTime();
- logger.debug(
- "process read buffer request using {} ms",
TimeUnit.NANOSECONDS.toMillis(end - start));
- request.getBufferListener().notifyBuffers(buffers, null);
- } else {
- // Free buffer pool memory to main direct memory when dispatcher is
idle.
- readBufferAllocator.trimCurrentThreadCache();
}
}
}