This is an automated email from the ASF dual-hosted git repository.

zhouky pushed a commit to branch branch-0.5
in repository https://gitbox.apache.org/repos/asf/celeborn.git


The following commit(s) were added to refs/heads/branch-0.5 by this push:
     new 244d39efe [CELEBORN-1544] ShuffleWriter needs to call close finally to 
avoid memory leaks
244d39efe is described below

commit 244d39efe992a69ed57d130a4de58a7dde96503e
Author: sychen <[email protected]>
AuthorDate: Fri Aug 2 20:24:31 2024 +0800

    [CELEBORN-1544] ShuffleWriter needs to call close finally to avoid memory 
leaks
    
    ### What changes were proposed in this pull request?
    This PR aims to fix a possible memory leak in ShuffleWriter.
    
    ### Why are the changes needed?
    When we turn on `spark.speculation=true` or we kill the executing SQL, the 
task may be interrupted. At this time, `ShuffleWriter` may not call close.
    At this time, `DataPusher#idleQueue` will occupy some memory capacity ( 
`celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity` 
) and the instance will not be released.
    
    ```java
    Thread 537 (DataPusher-78931):
      State: TIMED_WAITING
      Blocked count: 0
      Waited count: 16337
      IsDaemon: true
      Stack:
        java.lang.Thread.sleep(Native Method)
        
org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:135)
        org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:122)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Production testing
    
    #### Current
    <img width="547" alt="image" 
src="https://github.com/user-attachments/assets/d6f64257-144e-4139-96c6-518ca5f1bfd2";>
    
    #### PR
    <img width="479" alt="image" 
src="https://github.com/user-attachments/assets/e4ff62ec-5b9d-47a4-a36c-1d13bf378cbc";>
    
    Closes #2661 from cxzl25/CELEBORN-1544.
    
    Authored-by: sychen <[email protected]>
    Signed-off-by: zky.zhoukeyong <[email protected]>
    (cherry picked from commit 48ce9b9b497ac2346e8a2fc32d7a369da0f181fa)
    Signed-off-by: zky.zhoukeyong <[email protected]>
---
 .../shuffle/celeborn/HashBasedShuffleWriter.java   |  7 ++++++-
 .../shuffle/celeborn/SortBasedShuffleWriter.java   | 23 ++++++++++++----------
 .../shuffle/celeborn/HashBasedShuffleWriter.java   |  7 ++++++-
 .../shuffle/celeborn/SortBasedShuffleWriter.java   |  7 +++++--
 4 files changed, 30 insertions(+), 14 deletions(-)

diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 407b32849..0986c23ef 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -174,9 +174,14 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       } else {
         write0(records);
       }
-      close();
     } catch (InterruptedException e) {
       TaskInterruptedHelper.throwTaskKillException();
+    } finally {
+      try {
+        close();
+      } catch (InterruptedException e) {
+        TaskInterruptedHelper.throwTaskKillException();
+      }
     }
   }
 
diff --git 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 2479d19cf..5ecf1edba 100644
--- 
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -143,18 +143,21 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   @Override
   public void write(scala.collection.Iterator<Product2<K, V>> records) throws 
IOException {
-    if (canUseFastWrite()) {
-      fastWrite0(records);
-    } else if (dep.mapSideCombine()) {
-      if (dep.aggregator().isEmpty()) {
-        throw new UnsupportedOperationException(
-            "When using map side combine, an aggregator must be specified.");
+    try {
+      if (canUseFastWrite()) {
+        fastWrite0(records);
+      } else if (dep.mapSideCombine()) {
+        if (dep.aggregator().isEmpty()) {
+          throw new UnsupportedOperationException(
+              "When using map side combine, an aggregator must be specified.");
+        }
+        write0(dep.aggregator().get().combineValuesByKey(records, 
taskContext));
+      } else {
+        write0(records);
       }
-      write0(dep.aggregator().get().combineValuesByKey(records, taskContext));
-    } else {
-      write0(records);
+    } finally {
+      close();
     }
-    close();
   }
 
   @VisibleForTesting
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 34377ee4a..5a9b04552 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -170,9 +170,14 @@ public class HashBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
       } else {
         write0(records);
       }
-      close();
     } catch (InterruptedException e) {
       TaskInterruptedHelper.throwTaskKillException();
+    } finally {
+      try {
+        close();
+      } catch (InterruptedException e) {
+        TaskInterruptedHelper.throwTaskKillException();
+      }
     }
   }
 
diff --git 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 2b810b190..f3b856394 100644
--- 
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++ 
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -226,8 +226,11 @@ public class SortBasedShuffleWriter<K, V, C> extends 
ShuffleWriter<K, V> {
 
   @Override
   public void write(scala.collection.Iterator<Product2<K, V>> records) throws 
IOException {
-    doWrite(records);
-    close();
+    try {
+      doWrite(records);
+    } finally {
+      close();
+    }
   }
 
   @VisibleForTesting

Reply via email to