This is an automated email from the ASF dual-hosted git repository.

ethanfeng pushed a commit to branch branch-0.3
in repository https://gitbox.apache.org/repos/asf/incubator-celeborn.git


The following commit(s) were added to refs/heads/branch-0.3 by this push:
     new 0775eb5db [CELEBORN-614][FOLLOWUP] Fix flushOnMemoryPressure condition
0775eb5db is described below

commit 0775eb5db74d3ebdc3725a727d0266b13d071201
Author: Cheng Pan <[email protected]>
AuthorDate: Fri Jul 7 15:33:09 2023 +0800

    [CELEBORN-614][FOLLOWUP] Fix flushOnMemoryPressure condition
    
    ### What changes were proposed in this pull request?
    
    Fix the refactor bug of CELEBORN-614 
(https://github.com/apache/incubator-celeborn/pull/1517).
    
    ### Why are the changes needed?
    
    This is a bug fix, the condition `writer.getException != null` was inverted 
accidentally during CELEBORN-614 
(https://github.com/apache/incubator-celeborn/pull/1517), which causes the trim 
became no-op.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No. The bug was caused by an unreleased commit.
    
    ### How was this patch tested?
    
    Set Worker off-heap memory to 2G, and run 1T tera sort.
    
    Before: the trim does not trigger disk buffer flush, causing the worker can 
not to recover from the pause pushdata state, then Job failed.
    
    After: the trim correctly triggers disk buffer flush, releases the worker 
memory, and the Job succeeded.
    
    <img width="1653" alt="image" 
src="https://github.com/apache/incubator-celeborn/assets/26535726/9ef62c78-e6a9-497f-9dac-d3f712e830cc";>
    
    Closes #1689 from pan3793/CELEBORN-614-followup.
    
    Authored-by: Cheng Pan <[email protected]>
    Signed-off-by: mingji <[email protected]>
    (cherry picked from commit e314f85087eaa2c0761daefa290df11a0ff56f49)
    Signed-off-by: mingji <[email protected]>
---
 .../celeborn/service/deploy/worker/storage/StorageManager.scala      | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)

diff --git 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
index 37fe0581d..97095ede1 100644
--- 
a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
+++ 
b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala
@@ -629,7 +629,7 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
       override def accept(t: File, writers: ConcurrentHashMap[String, 
FileWriter]): Unit = {
         writers.forEach(new BiConsumer[String, FileWriter] {
           override def accept(file: String, writer: FileWriter): Unit = {
-            if (writer.getException != null) {
+            if (writer.getException == null) {
               try {
                 writer.flushOnMemoryPressure()
               } catch {
@@ -638,6 +638,9 @@ final private[worker] class StorageManager(conf: 
CelebornConf, workerSource: Abs
                     s"FileWrite of $writer faces unexpected exception when 
flush on memory pressure.",
                     t)
               }
+            } else {
+              logWarning(s"Skip flushOnMemoryPressure because 
${writer.flusher} " +
+                s"has error: ${writer.getException.getMessage}")
             }
           }
         })

Reply via email to