This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new aab887dad [#2061] fix(server): Potential Netty memory leak when 
removeBuffer and cacheShuffleData happen at the same time (#2059)
aab887dad is described below

commit aab887dade4eeca87abc8ea713556a5599d5726f
Author: leewish <[email protected]>
AuthorDate: Tue Oct 15 11:57:24 2024 +0800

    [#2061] fix(server): Potential Netty memory leak when removeBuffer and 
cacheShuffleData happen at the same time (#2059)
    
    ### What changes were proposed in this pull request?
    
     Fix netty memory leak when removeBuffer and cacheShuffleData happen 
concurrent
    
    ### Why are the changes needed?
    
    Fix: https://github.com/apache/incubator-uniffle/issues/2061
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests
    
    ---
    
    Co-authored-by: wenlongwlli <[email protected]>
---
 .../server/buffer/AbstractShuffleBuffer.java       |  4 +++
 .../server/buffer/ShuffleBufferManager.java        |  3 +++
 .../server/buffer/ShuffleBufferWithLinkedList.java | 30 ++++++++++++----------
 .../server/buffer/ShuffleBufferWithSkipList.java   | 18 +++++++------
 4 files changed, 33 insertions(+), 22 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index f520603da..c0f880b2d 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -44,8 +44,12 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
 
   protected AtomicLong inFlushSize = new AtomicLong();
 
+  protected volatile boolean evicted;
+  public static final long BUFFER_EVICTED = -1L;
+
   public AbstractShuffleBuffer() {
     this.size = 0;
+    this.evicted = false;
   }
 
   /** Only for test */
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index aac9be42e..7ad5e40ab 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -219,6 +219,9 @@ public class ShuffleBufferManager {
 
     ShuffleBuffer buffer = entry.getValue();
     long size = buffer.append(spd);
+    if (size == AbstractShuffleBuffer.BUFFER_EVICTED) {
+      return StatusCode.NO_REGISTER;
+    }
     if (!isPreAllocated) {
       updateUsedMemory(size);
     }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index 9597cbfe3..aa4cc2010 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -50,23 +50,24 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
   }
 
   @Override
-  public long append(ShufflePartitionedData data) {
-    long size = 0;
+  public synchronized long append(ShufflePartitionedData data) {
+    if (evicted) {
+      return BUFFER_EVICTED;
+    }
+    long currentSize = 0;
 
-    synchronized (this) {
-      for (ShufflePartitionedBlock block : data.getBlockList()) {
-        // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
-        // block would gc without release. Here we must release the duplicated 
block.
-        if (blocks.add(block)) {
-          size += block.getEncodedLength();
-        } else {
-          block.getData().release();
-        }
+    for (ShufflePartitionedBlock block : data.getBlockList()) {
+      // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
+      // block would gc without release. Here we must release the duplicated 
block.
+      if (blocks.add(block)) {
+        currentSize += block.getEncodedLength();
+      } else {
+        block.getData().release();
       }
-      this.size += size;
     }
+    this.size += currentSize;
 
-    return size;
+    return currentSize;
   }
 
   @Override
@@ -119,10 +120,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
   }
 
   @Override
-  public long release() {
+  public synchronized long release() {
     Throwable lastException = null;
     int failedToReleaseSize = 0;
     long releasedSize = 0;
+    evicted = true;
     for (ShufflePartitionedBlock spb : blocks) {
       try {
         spb.getData().release();
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 50e6d686f..726c30d3a 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -58,25 +58,26 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
   }
 
   @Override
-  public long append(ShufflePartitionedData data) {
-    long size = 0;
+  public synchronized long append(ShufflePartitionedData data) {
+    if (evicted) {
+      return BUFFER_EVICTED;
+    }
+    long currentSize = 0;
 
-    synchronized (this) {
       for (ShufflePartitionedBlock block : data.getBlockList()) {
         // If sendShuffleData retried, we may receive duplicate block. The 
duplicate
         // block would gc without release. Here we must release the duplicated 
block.
         if (!blocksMap.containsKey(block.getBlockId())) {
           blocksMap.put(block.getBlockId(), block);
           blockCount++;
-          size += block.getEncodedLength();
+          currentSize += block.getEncodedLength();
         } else {
           block.getData().release();
         }
       }
-      this.size += size;
-    }
+      this.size += currentSize;
 
-    return size;
+    return currentSize;
   }
 
   @Override
@@ -120,10 +121,11 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
   }
 
   @Override
-  public long release() {
+  public synchronized long release() {
     Throwable lastException = null;
     int failedToReleaseSize = 0;
     long releasedSize = 0;
+    evicted = true;
     for (ShufflePartitionedBlock spb : blocksMap.values()) {
       try {
         spb.getData().release();

Reply via email to