This is an automated email from the ASF dual-hosted git repository.

rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/main by this push:
     new 2dd1b7aac [CELEBORN-2210] When a flushBuffer consolidation OOM 
exception occurs…
2dd1b7aac is described below

commit 2dd1b7aac4d488c0496da1fc77cab9c79bccd69b
Author: xxx <[email protected]>
AuthorDate: Fri Jan 2 20:04:50 2026 +0800

    [CELEBORN-2210] When a flushBuffer consolidation OOM exception occurs…
    
    …, support setting the Buffer for fileInfo.
    
    ### What changes were proposed in this pull request?
    
    When a flushBuffer consolidation OOM exception occurs, support setting the 
Buffer for fileInfo.
    
    ### Why are the changes needed?
    
    When a flushBuffer consolidation OOM exception occurs, the current logic 
does not allow setting the Buffer for fileInfo.
    
    ### Does this PR resolve a correctness bug?
    
    NO
    
    ### Does this PR introduce _any_ user-facing change?
    
    NO
    
    ### How was this patch tested?
    
    CI
    
    Closes #3547 from xy2953396112/CELEBORN-2210.
    
    Authored-by: xxx <[email protected]>
    Signed-off-by: Shuang <[email protected]>
---
 .../service/deploy/worker/storage/TierWriter.scala         | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 352abe1e6..7ff64f7bd 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -341,8 +341,18 @@ class MemoryTierWriter(
   }
 
   override def closeStreams(): Unit = {
-    flushBuffer.consolidate()
-    fileInfo.setBuffer(flushBuffer)
+    try {
+      flushBuffer.consolidate()
+    } catch {
+      case oom: OutOfMemoryError =>
+        logError(
+          s"MemoryTierWriter 
shuffleKey:${partitionDataWriterContext.getShuffleKey}, " +
+            
s"partitionId:${partitionDataWriterContext.getPartitionLocation.getFileName} " +
+            s"failed to consolidate flush buffer due to OutOfMemoryError.",
+          oom)
+    } finally {
+      fileInfo.setBuffer(flushBuffer)
+    }
   }
 
   override def takeBufferInternal(): CompositeByteBuf = {

Reply via email to