This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 372ef79a0 [CELEBORN-1760] OOM causes disk buffer unable to be released
372ef79a0 is described below

commit 372ef79a0867ec44152aa1b4f49c78fadf05d47d
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Dec 10 13:47:57 2024 +0800

    [CELEBORN-1760] OOM causes disk buffer unable to be released
    
    ### What changes were proposed in this pull request?
    When OOM occurs in flushBuffer.addComponent, there are two problems.
    
    1. decrementPendingWrites is not called, causing close PartitionDataWriter 
to be stuck for a period of time during commit.
    2. After OOM occurs, ByteBuf is not released, causing memory leaks.
    
    ### Why are the changes needed?
    Fix disk buffer unable to be released
    
    ### Does this PR introduce _any_ user-facing change?
    Yes, it fixes a memory leak issue in some corner cases.
    
    ### How was this patch tested?
    
    I did some tests. 2 of the nodes did not have this PR, and the memory of 
these two nodes could not be released. 1 node had this PR, and the memory could 
be released. It was obviously much lower than the previous 2 nodes.
    
![image](https://github.com/user-attachments/assets/3fe846ec-6ee8-432a-be7a-a7efb7c102d0)
    
    Closes #2975 from leixm/CELEBORN-1760.
    
    Authored-by: Xianming Lei <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/worker/storage/PartitionDataWriter.java   | 12 +++++++++++-
 .../celeborn/service/deploy/worker/PushDataHandler.scala     |  2 +-
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 9032bfe91..185eeb950 100644
--- 
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++ 
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -416,7 +416,17 @@ public abstract class PartitionDataWriter implements 
DeviceObserver {
       }
 
       data.retain();
-      flushBuffer.addComponent(true, data);
+      try {
+        flushBuffer.addComponent(true, data);
+      } catch (OutOfMemoryError oom) {
+        data.release();
+        if (isMemoryShuffleFile.get()) {
+          MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+        } else {
+          MemoryManager.instance().releaseDiskBuffer(numBytes);
+        }
+        throw oom;
+      }
       if (isMemoryShuffleFile.get()) {
         memoryFileInfo.updateBytesFlushed(numBytes);
       }
diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 0890f8355..15e217c88 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1473,7 +1473,7 @@ class PushDataHandler(val workerSource: WorkerSource) 
extends BaseMessageHandler
         fileWriter.write(body)
         result(index) = StatusCode.SUCCESS
       } catch {
-        case e: Exception =>
+        case e: Throwable =>
           if (e.isInstanceOf[AlreadyClosedException]) {
             val (mapId, attemptId) = getMapAttempt(body)
             val endedAttempt =

Reply via email to