This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new fff97252a [CELEBORN-1760][FOLLOWUP] Remove redundant release on data 
added in flushBuffer
fff97252a is described below

commit fff97252ad7ef59ff68046e0a33392df453aded7
Author: xinyuwang1 <[email protected]>
AuthorDate: Thu May 8 18:01:12 2025 +0800

    [CELEBORN-1760][FOLLOWUP] Remove redundant release on data added in 
flushBuffer
    
    ### What changes were proposed in this pull request?
    Remove the redundant release of data after OutOfDirectMemoryError appears 
in flushBuffer.addComponent
    
    ### Why are the changes needed?
    The reason why OutOfDirectMemoryError will appear in 
flushBuffer.addComponent is that after adding a new component, CompositeByteBuf 
will determine whether the number of components exceeds the maximum limit. If 
it exceeds, the existing components will be merged into a large component. At 
this time, new off-heap memory will be requested. If there is insufficient 
memory at this time, OutOfDirectMemoryError will be reported, but the new 
component has been added to flushBuffer at this tim [...]
    Don't worry about the component here not being released causing memory 
leaks, because it will be released normally in returnBuffer (flush or file 
destroy or file close).
    If writeLocalData does not catch OutOfDirectMemoryError, the impact is as 
follows:
    1. In the case of a single copy, if 
https://github.com/apache/celeborn/pull/3049 pr is not merged, commitfile will 
be blocked in waitPendingWrites and fail, because writeLocalData does not 
correctly decrementPendingWrites. However, this will not cause flushBuffer to 
exist in memory for a long time, because when shuffle expires, the file will be 
destroyed, flushBuffer will be returned, and this part of memory will be 
released.
    2. In the case of dual replicas, in addition to the above problems, the 
thread of the Eventloop to which replicate-client belongs will be blocked at 
Await.result(writePromise.future, Duration.Inf) because writePromise is not 
closed correctly. As a result, this thread will not process other PushData data 
written by worker-data-replicator to the channels of the Eventloop to which 
replicate-client belongs. This part of data accumulates in the taskQueue of 
EventLoop and cannot be canceled [...]
    <img width="1081" alt="image" 
src="https://github.com/user-attachments/assets/a90ac423-443e-42f9-a0d2-cc49f24f6476";
 />
    
    Therefore, if the memory leak occurs after OutOfDirectMemoryError occurs in 
flushBuffer.addComponent, you only need to catch OutOfDirectMemoryError in 
writeLocalData, and there is no need to release data after addComponent.
    
    I simulated the scenario where addCompoent had an OutOfDirectMemoryError, 
and released data after the OutOfDirectMemoryError occurred, and a refcnt error 
occurred.
    
[oom_fix_error_release.log](https://github.com/user-attachments/files/19863484/oom_fix_error_release.log)
    
    At the same time, I simulated the scenario where addCompoent had an 
OutOfDirectMemoryError and did not release data after the 
OutOfDirectMemoryError occurred. No refcnt error occurred, commitfiles 
succeeded, the spark task succeeded, and after commitfiles, the worker 
diskbuffercount became 0.
    
[celeborn_1760_followup_worker.log](https://github.com/user-attachments/files/19864486/celeborn_1760_followup_worker.log)
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    manual test.
    
    Closes #3224 from littlexyw/CELEBORN-1760-FOLLOWUP.
    
    Authored-by: xinyuwang1 <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/worker/storage/TierWriter.scala | 27 ++++++++++++----------
 1 file changed, 15 insertions(+), 12 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index ab7c39a66..edfbff675 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -316,16 +316,19 @@ class MemoryTierWriter(
 
   override def writeInternal(buf: ByteBuf): Unit = {
     buf.retain()
+    val numBytes = buf.readableBytes()
     try {
       flushBuffer.addComponent(true, buf)
     } catch {
       case oom: OutOfMemoryError =>
-        MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())
+        // memory tier writer will not flush
+        // add the bytes into flusher buffer is flush completed
+        metaHandler.afterFlush(numBytes)
+        MemoryManager.instance.incrementMemoryFileStorage(numBytes)
         throw oom
     }
     // memory tier writer will not flush
     // add the bytes into flusher buffer is flush completed
-    val numBytes = buf.readableBytes()
     metaHandler.afterFlush(numBytes)
     MemoryManager.instance().incrementMemoryFileStorage(numBytes)
   }
@@ -415,15 +418,16 @@ class LocalTierWriter(
     buf.retain()
     try {
       flushBuffer.addComponent(true, buf)
-      MemoryManager.instance.incrementDiskBuffer(numBytes)
-      if (userCongestionControlContext != null)
-        userCongestionControlContext.updateProduceBytes(numBytes)
     } catch {
       case oom: OutOfMemoryError =>
-        buf.release()
-        MemoryManager.instance().releaseDiskBuffer(numBytes)
-        throw oom;
+        MemoryManager.instance.incrementDiskBuffer(numBytes)
+        if (userCongestionControlContext != null)
+          userCongestionControlContext.updateProduceBytes(numBytes)
+        throw oom
     }
+    MemoryManager.instance.incrementDiskBuffer(numBytes)
+    if (userCongestionControlContext != null)
+      userCongestionControlContext.updateProduceBytes(numBytes)
   }
 
   override def evict(file: TierWriterBase): Unit = ???
@@ -620,13 +624,12 @@ class DfsTierWriter(
     buf.retain()
     try {
       flushBuffer.addComponent(true, buf)
-      MemoryManager.instance.incrementDiskBuffer(numBytes)
     } catch {
       case oom: OutOfMemoryError =>
-        buf.release()
-        MemoryManager.instance().releaseDiskBuffer(numBytes)
-        throw oom;
+        MemoryManager.instance.incrementDiskBuffer(numBytes)
+        throw oom
     }
+    MemoryManager.instance.incrementDiskBuffer(numBytes)
   }
 
   override def evict(file: TierWriterBase): Unit = ???

Reply via email to