This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 2dd1b7aac [CELEBORN-2210] When a flushBuffer consolidation OOM
exception occurs…
2dd1b7aac is described below
commit 2dd1b7aac4d488c0496da1fc77cab9c79bccd69b
Author: xxx <[email protected]>
AuthorDate: Fri Jan 2 20:04:50 2026 +0800
[CELEBORN-2210] When a flushBuffer consolidation OOM exception occurs…
…, support setting the Buffer for fileInfo.
### What changes were proposed in this pull request?
When a flushBuffer consolidation OOM exception occurs, support setting the
Buffer for fileInfo.
### Why are the changes needed?
When a flushBuffer consolidation OOM exception occurs, the current logic
does not allow setting the Buffer for fileInfo.
### Does this PR resolve a correctness bug?
NO
### Does this PR introduce _any_ user-facing change?
NO
### How was this patch tested?
CI
Closes #3547 from xy2953396112/CELEBORN-2210.
Authored-by: xxx <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/storage/TierWriter.scala | 14 ++++++++++++--
1 file changed, 12 insertions(+), 2 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index 352abe1e6..7ff64f7bd 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -341,8 +341,18 @@ class MemoryTierWriter(
}
override def closeStreams(): Unit = {
- flushBuffer.consolidate()
- fileInfo.setBuffer(flushBuffer)
+ try {
+ flushBuffer.consolidate()
+ } catch {
+ case oom: OutOfMemoryError =>
+ logError(
+ s"MemoryTierWriter
shuffleKey:${partitionDataWriterContext.getShuffleKey}, " +
+
s"partitionId:${partitionDataWriterContext.getPartitionLocation.getFileName} " +
+ s"failed to consolidate flush buffer due to OutOfMemoryError.",
+ oom)
+ } finally {
+ fileInfo.setBuffer(flushBuffer)
+ }
}
override def takeBufferInternal(): CompositeByteBuf = {