This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new aab887dad [#2061] fix(server): Potential Netty memory leak when
removeBuffer and cacheShuffleData happen at the same time (#2059)
aab887dad is described below
commit aab887dade4eeca87abc8ea713556a5599d5726f
Author: leewish <[email protected]>
AuthorDate: Tue Oct 15 11:57:24 2024 +0800
[#2061] fix(server): Potential Netty memory leak when removeBuffer and
cacheShuffleData happen at the same time (#2059)
### What changes were proposed in this pull request?
Fix netty memory leak when removeBuffer and cacheShuffleData happen
concurrent
### Why are the changes needed?
Fix: https://github.com/apache/incubator-uniffle/issues/2061
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Existing tests
---
Co-authored-by: wenlongwlli <[email protected]>
---
.../server/buffer/AbstractShuffleBuffer.java | 4 +++
.../server/buffer/ShuffleBufferManager.java | 3 +++
.../server/buffer/ShuffleBufferWithLinkedList.java | 30 ++++++++++++----------
.../server/buffer/ShuffleBufferWithSkipList.java | 18 +++++++------
4 files changed, 33 insertions(+), 22 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index f520603da..c0f880b2d 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -44,8 +44,12 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
protected AtomicLong inFlushSize = new AtomicLong();
+ protected volatile boolean evicted;
+ public static final long BUFFER_EVICTED = -1L;
+
public AbstractShuffleBuffer() {
this.size = 0;
+ this.evicted = false;
}
/** Only for test */
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index aac9be42e..7ad5e40ab 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -219,6 +219,9 @@ public class ShuffleBufferManager {
ShuffleBuffer buffer = entry.getValue();
long size = buffer.append(spd);
+ if (size == AbstractShuffleBuffer.BUFFER_EVICTED) {
+ return StatusCode.NO_REGISTER;
+ }
if (!isPreAllocated) {
updateUsedMemory(size);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 9597cbfe3..aa4cc2010 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -50,23 +50,24 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
}
@Override
- public long append(ShufflePartitionedData data) {
- long size = 0;
+ public synchronized long append(ShufflePartitionedData data) {
+ if (evicted) {
+ return BUFFER_EVICTED;
+ }
+ long currentSize = 0;
- synchronized (this) {
- for (ShufflePartitionedBlock block : data.getBlockList()) {
- // If sendShuffleData retried, we may receive duplicate block. The
duplicate
- // block would gc without release. Here we must release the duplicated
block.
- if (blocks.add(block)) {
- size += block.getEncodedLength();
- } else {
- block.getData().release();
- }
+ for (ShufflePartitionedBlock block : data.getBlockList()) {
+ // If sendShuffleData retried, we may receive duplicate block. The
duplicate
+ // block would gc without release. Here we must release the duplicated
block.
+ if (blocks.add(block)) {
+ currentSize += block.getEncodedLength();
+ } else {
+ block.getData().release();
}
- this.size += size;
}
+ this.size += currentSize;
- return size;
+ return currentSize;
}
@Override
@@ -119,10 +120,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
}
@Override
- public long release() {
+ public synchronized long release() {
Throwable lastException = null;
int failedToReleaseSize = 0;
long releasedSize = 0;
+ evicted = true;
for (ShufflePartitionedBlock spb : blocks) {
try {
spb.getData().release();
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 50e6d686f..726c30d3a 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -58,25 +58,26 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
}
@Override
- public long append(ShufflePartitionedData data) {
- long size = 0;
+ public synchronized long append(ShufflePartitionedData data) {
+ if (evicted) {
+ return BUFFER_EVICTED;
+ }
+ long currentSize = 0;
- synchronized (this) {
for (ShufflePartitionedBlock block : data.getBlockList()) {
// If sendShuffleData retried, we may receive duplicate block. The
duplicate
// block would gc without release. Here we must release the duplicated
block.
if (!blocksMap.containsKey(block.getBlockId())) {
blocksMap.put(block.getBlockId(), block);
blockCount++;
- size += block.getEncodedLength();
+ currentSize += block.getEncodedLength();
} else {
block.getData().release();
}
}
- this.size += size;
- }
+ this.size += currentSize;
- return size;
+ return currentSize;
}
@Override
@@ -120,10 +121,11 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
}
@Override
- public long release() {
+ public synchronized long release() {
Throwable lastException = null;
int failedToReleaseSize = 0;
long releasedSize = 0;
+ evicted = true;
for (ShufflePartitionedBlock spb : blocksMap.values()) {
try {
spb.getData().release();