This is an automated email from the ASF dual-hosted git repository.
ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git
The following commit(s) were added to refs/heads/branch-0.3 by this push:
new 0775eb5db [CELEBORN-614][FOLLOWUP] Fix flushOnMemoryPressure condition
0775eb5db is described below
commit 0775eb5db74d3ebdc3725a727d0266b13d071201
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jul 7 15:33:09 2023 +0800
[CELEBORN-614][FOLLOWUP] Fix flushOnMemoryPressure condition
### What changes were proposed in this pull request?
Fix the refactor bug of CELEBORN-614
(https://github.com/apache/incubator-celeborn/pull/1517).
### Why are the changes needed?
This is a bug fix, the condition `writer.getException != null` was inverted
accidentally during CELEBORN-614
(https://github.com/apache/incubator-celeborn/pull/1517), which causes the trim
became no-op.
### Does this PR introduce _any_ user-facing change?
No. The bug was caused by an unreleased commit.
### How was this patch tested?
Set Worker off-heap memory to 2G, and run 1T tera sort.
Before: the trim does not trigger disk buffer flush, causing the worker can
not to recover from the pause pushdata state, then Job failed.
After: the trim correctly triggers disk buffer flush, releases the worker
memory, and the Job succeeded.
<img width="1653" alt="image"
src="https://github.com/apache/incubator-celeborn/assets/26535726/9ef62c78-e6a9-497f-9dac-d3f712e830cc">
Closes #1689 from pan3793/CELEBORN-614-followup.
Authored-by: Cheng Pan <[email protected]>
Signed-off-by: mingji <[email protected]>
(cherry picked from commit e314f85087eaa2c0761daefa290df11a0ff56f49)
Signed-off-by: mingji <[email protected]>
---
.../celeborn/service/deploy/worker/storage/StorageManager.scala | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 37fe0581d..97095ede1 100644
---
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -629,7 +629,7 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
override def accept(t: File, writers: ConcurrentHashMap[String,
FileWriter]): Unit = {
writers.forEach(new BiConsumer[String, FileWriter] {
override def accept(file: String, writer: FileWriter): Unit = {
- if (writer.getException != null) {
+ if (writer.getException == null) {
try {
writer.flushOnMemoryPressure()
} catch {
@@ -638,6 +638,9 @@ final private[worker] class StorageManager(conf:
CelebornConf, workerSource: Abs
s"FileWrite of $writer faces unexpected exception when
flush on memory pressure.",
t)
}
+ } else {
+ logWarning(s"Skip flushOnMemoryPressure because
${writer.flusher} " +
+ s"has error: ${writer.getException.getMessage}")
}
}
})