This is an automated email from the ASF dual-hosted git repository.
zhouky pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/celeborn.git
The following commit(s) were added to refs/heads/main by this push:
new 48ce9b9b4 [CELEBORN-1544] ShuffleWriter needs to call close finally to
avoid memory leaks
48ce9b9b4 is described below
commit 48ce9b9b497ac2346e8a2fc32d7a369da0f181fa
Author: sychen <[email protected]>
AuthorDate: Fri Aug 2 20:24:31 2024 +0800
[CELEBORN-1544] ShuffleWriter needs to call close finally to avoid memory
leaks
### What changes were proposed in this pull request?
This PR aims to fix a possible memory leak in ShuffleWriter.
### Why are the changes needed?
When we turn on `spark.speculation=true` or we kill the executing SQL, the
task may be interrupted. At this time, `ShuffleWriter` may not call close.
At this time, `DataPusher#idleQueue` will occupy some memory capacity (
`celeborn.client.push.buffer.max.size` * `celeborn.client.push.queue.capacity`
) and the instance will not be released.
```java
Thread 537 (DataPusher-78931):
State: TIMED_WAITING
Blocked count: 0
Waited count: 16337
IsDaemon: true
Stack:
java.lang.Thread.sleep(Native Method)
org.apache.celeborn.client.write.DataPushQueue.takePushTasks(DataPushQueue.java:135)
org.apache.celeborn.client.write.DataPusher$1.run(DataPusher.java:122)
```
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Production testing
#### Current
<img width="547" alt="image"
src="https://github.com/user-attachments/assets/d6f64257-144e-4139-96c6-518ca5f1bfd2">
#### PR
<img width="479" alt="image"
src="https://github.com/user-attachments/assets/e4ff62ec-5b9d-47a4-a36c-1d13bf378cbc">
Closes #2661 from cxzl25/CELEBORN-1544.
Authored-by: sychen <[email protected]>
Signed-off-by: zky.zhoukeyong <[email protected]>
---
.../shuffle/celeborn/HashBasedShuffleWriter.java | 7 ++++++-
.../shuffle/celeborn/SortBasedShuffleWriter.java | 23 ++++++++++++----------
.../shuffle/celeborn/HashBasedShuffleWriter.java | 7 ++++++-
.../shuffle/celeborn/SortBasedShuffleWriter.java | 7 +++++--
4 files changed, 30 insertions(+), 14 deletions(-)
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 407b32849..0986c23ef 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -174,9 +174,14 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
} else {
write0(records);
}
- close();
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
+ } finally {
+ try {
+ close();
+ } catch (InterruptedException e) {
+ TaskInterruptedHelper.throwTaskKillException();
+ }
}
}
diff --git
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 2479d19cf..5ecf1edba 100644
---
a/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-2/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -143,18 +143,21 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws
IOException {
- if (canUseFastWrite()) {
- fastWrite0(records);
- } else if (dep.mapSideCombine()) {
- if (dep.aggregator().isEmpty()) {
- throw new UnsupportedOperationException(
- "When using map side combine, an aggregator must be specified.");
+ try {
+ if (canUseFastWrite()) {
+ fastWrite0(records);
+ } else if (dep.mapSideCombine()) {
+ if (dep.aggregator().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "When using map side combine, an aggregator must be specified.");
+ }
+ write0(dep.aggregator().get().combineValuesByKey(records,
taskContext));
+ } else {
+ write0(records);
}
- write0(dep.aggregator().get().combineValuesByKey(records, taskContext));
- } else {
- write0(records);
+ } finally {
+ close();
}
- close();
}
@VisibleForTesting
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
index 34377ee4a..5a9b04552 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/HashBasedShuffleWriter.java
@@ -170,9 +170,14 @@ public class HashBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
} else {
write0(records);
}
- close();
} catch (InterruptedException e) {
TaskInterruptedHelper.throwTaskKillException();
+ } finally {
+ try {
+ close();
+ } catch (InterruptedException e) {
+ TaskInterruptedHelper.throwTaskKillException();
+ }
}
}
diff --git
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
index 2b810b190..f3b856394 100644
---
a/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
+++
b/client-spark/spark-3/src/main/java/org/apache/spark/shuffle/celeborn/SortBasedShuffleWriter.java
@@ -226,8 +226,11 @@ public class SortBasedShuffleWriter<K, V, C> extends
ShuffleWriter<K, V> {
@Override
public void write(scala.collection.Iterator<Product2<K, V>> records) throws
IOException {
- doWrite(records);
- close();
+ try {
+ doWrite(records);
+ } finally {
+ close();
+ }
}
@VisibleForTesting