This is an automated email from the ASF dual-hosted git repository.
rexxiong pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new fff97252a [CELEBORN-1760][FOLLOWUP] Remove redundant release on data
added in flushBuffer
fff97252a is described below
commit fff97252ad7ef59ff68046e0a33392df453aded7
Author: xinyuwang1 <[email protected]>
AuthorDate: Thu May 8 18:01:12 2025 +0800
[CELEBORN-1760][FOLLOWUP] Remove redundant release on data added in
flushBuffer
### What changes were proposed in this pull request?
Remove the redundant release of data after OutOfDirectMemoryError appears
in flushBuffer.addComponent
### Why are the changes needed?
The reason why OutOfDirectMemoryError will appear in
flushBuffer.addComponent is that after adding a new component, CompositeByteBuf
will determine whether the number of components exceeds the maximum limit. If
it exceeds, the existing components will be merged into a large component. At
this time, new off-heap memory will be requested. If there is insufficient
memory at this time, OutOfDirectMemoryError will be reported, but the new
component has been added to flushBuffer at this tim [...]
Don't worry about the component here not being released causing memory
leaks, because it will be released normally in returnBuffer (flush or file
destroy or file close).
If writeLocalData does not catch OutOfDirectMemoryError, the impact is as
follows:
1. In the case of a single copy, if
https://github.com/apache/celeborn/pull/3049 pr is not merged, commitfile will
be blocked in waitPendingWrites and fail, because writeLocalData does not
correctly decrementPendingWrites. However, this will not cause flushBuffer to
exist in memory for a long time, because when shuffle expires, the file will be
destroyed, flushBuffer will be returned, and this part of memory will be
released.
2. In the case of dual replicas, in addition to the above problems, the
thread of the Eventloop to which replicate-client belongs will be blocked at
Await.result(writePromise.future, Duration.Inf) because writePromise is not
closed correctly. As a result, this thread will not process other PushData data
written by worker-data-replicator to the channels of the Eventloop to which
replicate-client belongs. This part of data accumulates in the taskQueue of
EventLoop and cannot be canceled [...]
<img width="1081" alt="image"
src="https://github.com/user-attachments/assets/a90ac423-443e-42f9-a0d2-cc49f24f6476"
/>
Therefore, if the memory leak occurs after OutOfDirectMemoryError occurs in
flushBuffer.addComponent, you only need to catch OutOfDirectMemoryError in
writeLocalData, and there is no need to release data after addComponent.
I simulated the scenario where addCompoent had an OutOfDirectMemoryError,
and released data after the OutOfDirectMemoryError occurred, and a refcnt error
occurred.
[oom_fix_error_release.log](https://github.com/user-attachments/files/19863484/oom_fix_error_release.log)
At the same time, I simulated the scenario where addCompoent had an
OutOfDirectMemoryError and did not release data after the
OutOfDirectMemoryError occurred. No refcnt error occurred, commitfiles
succeeded, the spark task succeeded, and after commitfiles, the worker
diskbuffercount became 0.
[celeborn_1760_followup_worker.log](https://github.com/user-attachments/files/19864486/celeborn_1760_followup_worker.log)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
manual test.
Closes #3224 from littlexyw/CELEBORN-1760-FOLLOWUP.
Authored-by: xinyuwang1 <[email protected]>
Signed-off-by: Shuang <[email protected]>
---
.../service/deploy/worker/storage/TierWriter.scala | 27 ++++++++++++----------
1 file changed, 15 insertions(+), 12 deletions(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
index ab7c39a66..edfbff675 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/TierWriter.scala
@@ -316,16 +316,19 @@ class MemoryTierWriter(
override def writeInternal(buf: ByteBuf): Unit = {
buf.retain()
+ val numBytes = buf.readableBytes()
try {
flushBuffer.addComponent(true, buf)
} catch {
case oom: OutOfMemoryError =>
- MemoryManager.instance.releaseMemoryFileStorage(buf.readableBytes())
+ // memory tier writer will not flush
+ // add the bytes into flusher buffer is flush completed
+ metaHandler.afterFlush(numBytes)
+ MemoryManager.instance.incrementMemoryFileStorage(numBytes)
throw oom
}
// memory tier writer will not flush
// add the bytes into flusher buffer is flush completed
- val numBytes = buf.readableBytes()
metaHandler.afterFlush(numBytes)
MemoryManager.instance().incrementMemoryFileStorage(numBytes)
}
@@ -415,15 +418,16 @@ class LocalTierWriter(
buf.retain()
try {
flushBuffer.addComponent(true, buf)
- MemoryManager.instance.incrementDiskBuffer(numBytes)
- if (userCongestionControlContext != null)
- userCongestionControlContext.updateProduceBytes(numBytes)
} catch {
case oom: OutOfMemoryError =>
- buf.release()
- MemoryManager.instance().releaseDiskBuffer(numBytes)
- throw oom;
+ MemoryManager.instance.incrementDiskBuffer(numBytes)
+ if (userCongestionControlContext != null)
+ userCongestionControlContext.updateProduceBytes(numBytes)
+ throw oom
}
+ MemoryManager.instance.incrementDiskBuffer(numBytes)
+ if (userCongestionControlContext != null)
+ userCongestionControlContext.updateProduceBytes(numBytes)
}
override def evict(file: TierWriterBase): Unit = ???
@@ -620,13 +624,12 @@ class DfsTierWriter(
buf.retain()
try {
flushBuffer.addComponent(true, buf)
- MemoryManager.instance.incrementDiskBuffer(numBytes)
} catch {
case oom: OutOfMemoryError =>
- buf.release()
- MemoryManager.instance().releaseDiskBuffer(numBytes)
- throw oom;
+ MemoryManager.instance.incrementDiskBuffer(numBytes)
+ throw oom
}
+ MemoryManager.instance.incrementDiskBuffer(numBytes)
}
override def evict(file: TierWriterBase): Unit = ???