This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/branch-0.5 by this push:
new 87b83ade6 [CELEBORN-1760] OOM causes disk buffer unable to be released
87b83ade6 is described below
commit 87b83ade6ab073d9f20ad3e12a9bddd55939d293
Author: Xianming Lei <[email protected]>
AuthorDate: Tue Dec 10 13:47:57 2024 +0800
[CELEBORN-1760] OOM causes disk buffer unable to be released
### What changes were proposed in this pull request?
When OOM occurs in flushBuffer.addComponent, there are two problems.
1. decrementPendingWrites is not called, causing close PartitionDataWriter
to be stuck for a period of time during commit.
2. After OOM occurs, ByteBuf is not released, causing memory leaks.
### Why are the changes needed?
Fix disk buffer unable to be released
### Does this PR introduce _any_ user-facing change?
Yes, it fixes a memory leak issue in some corner cases.
### How was this patch tested?
I did some tests. 2 of the nodes did not have this PR, and the memory of
these two nodes could not be released. 1 node had this PR, and the memory could
be released. It was obviously much lower than the previous 2 nodes.

Closes #2975 from leixm/CELEBORN-1760.
Authored-by: Xianming Lei <[email protected]>
Signed-off-by: Shuang <[email protected]>
(cherry picked from commit 372ef79a0867ec44152aa1b4f49c78fadf05d47d)
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/storage/PartitionDataWriter.java | 12 +++++++++++-
.../celeborn/service/deploy/worker/PushDataHandler.scala | 2 +-
2 files changed, 12 insertions(+), 2 deletions(-)
diff --git
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
index 03930bc60..9c7ee3d16 100644
---
a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
+++
b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/PartitionDataWriter.java
@@ -337,7 +337,17 @@ public abstract class PartitionDataWriter implements
DeviceObserver {
}
data.retain();
- flushBuffer.addComponent(true, data);
+ try {
+ flushBuffer.addComponent(true, data);
+ } catch (OutOfMemoryError oom) {
+ data.release();
+ if (isMemoryShuffleFile.get()) {
+ MemoryManager.instance().releaseMemoryFileStorage(numBytes);
+ } else {
+ MemoryManager.instance().releaseDiskBuffer(numBytes);
+ }
+ throw oom;
+ }
if (isMemoryShuffleFile.get()) {
memoryFileInfo.updateBytesFlushed(numBytes);
}
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
index 5f9696f6e..b35f730bd 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/PushDataHandler.scala
@@ -1265,7 +1265,7 @@ class PushDataHandler(val workerSource: WorkerSource)
extends BaseMessageHandler
try {
fileWriter.write(body)
} catch {
- case e: Exception =>
+ case e: Throwable =>
if (e.isInstanceOf[AlreadyClosedException]) {
val (mapId, attemptId) = getMapAttempt(body)
val endedAttempt =