This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 0e3bc383a [CELEBORN-1379] Catch Throwable for ReadBufferDispatcher 
thread
0e3bc383a is described below

commit 0e3bc383a5df3dd91c5336742755e182d052049a
Author: lvshuang.xjs <[email protected]>
AuthorDate: Thu Apr 11 16:07:04 2024 +0800

    [CELEBORN-1379] Catch Throwable for ReadBufferDispatcher thread
    
    ### What changes were proposed in this pull request?
    Catch throwable and release unused buffers.
    
    ### Why are the changes needed?
    When the BufferAllocator tries to allocate a new ByteBuf, it may throw an 
OutOfDirectMemoryError. We should catch Throwable in ReadBufferDispatcher to 
avoid the thread from exiting.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GA
    
    Closes #2452 from RexXiong/CELEBORN-1379.
    
    Lead-authored-by: lvshuang.xjs <[email protected]>
    Co-authored-by: Shuang <[email protected]>
    Signed-off-by: mingji <[email protected]>
---
 .../deploy/worker/memory/ReadBufferDispatcher.java | 58 +++++++++++++---------
 1 file changed, 34 insertions(+), 24 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
index 1ca32b620..17bbaa3b8 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/memory/ReadBufferDispatcher.java
@@ -76,33 +76,43 @@ public class ReadBufferDispatcher extends Thread {
         logger.info("Buffer dispatcher is closing");
       }
 
-      if (request != null) {
-        long start = System.nanoTime();
-        List<ByteBuf> buffers = new ArrayList<>();
-        int bufferSize = request.getBufferSize();
-        while (buffers.size() < request.getNumber()) {
-          if (memoryManager.readBufferAvailable(bufferSize)) {
-            memoryManager.changeReadBufferCounter(bufferSize);
-            ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
-            buffers.add(buf);
-            allocatedReadBuffers.increment();
-          } else {
-            try {
-              // If dispatcher can not allocate requested buffers, it will 
wait here until necessary
-              // buffers are get.
-              Thread.sleep(this.readBufferAllocationWait);
-            } catch (InterruptedException e) {
-              logger.info("Buffer dispatcher is closing");
+      List<ByteBuf> buffers = null;
+      try {
+        if (request != null) {
+          long start = System.nanoTime();
+          int bufferSize = request.getBufferSize();
+          buffers = new ArrayList<>();
+          while (buffers.size() < request.getNumber()) {
+            if (memoryManager.readBufferAvailable(bufferSize)) {
+              ByteBuf buf = readBufferAllocator.buffer(bufferSize, bufferSize);
+              buffers.add(buf);
+              memoryManager.changeReadBufferCounter(bufferSize);
+              allocatedReadBuffers.increment();
+            } else {
+              try {
+                // If dispatcher can not allocate requested buffers, it will 
wait here until
+                // necessary buffers are get.
+                Thread.sleep(this.readBufferAllocationWait);
+              } catch (InterruptedException e) {
+                logger.info("Buffer dispatcher is closing");
+              }
             }
           }
+          long end = System.nanoTime();
+          logger.debug(
+              "process read buffer request using {} ms",
+              TimeUnit.NANOSECONDS.toMillis(end - start));
+          request.getBufferListener().notifyBuffers(buffers, null);
+        } else {
+          // Free buffer pool memory to main direct memory when dispatcher is 
idle.
+          readBufferAllocator.trimCurrentThreadCache();
+        }
+      } catch (Throwable e) {
+        logger.error(e.getMessage(), e);
+        // recycle all allocated buffers
+        if (buffers != null) {
+          buffers.forEach(this::recycle);
         }
-        long end = System.nanoTime();
-        logger.debug(
-            "process read buffer request using {} ms", 
TimeUnit.NANOSECONDS.toMillis(end - start));
-        request.getBufferListener().notifyBuffers(buffers, null);
-      } else {
-        // Free buffer pool memory to main direct memory when dispatcher is 
idle.
-        readBufferAllocator.trimCurrentThreadCache();
       }
     }
   }

Reply via email to