This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7867d59b1 [#2701] fix(server): release the memory of duplicate blocks 
(#2702)
7867d59b1 is described below

commit 7867d59b1b15d1021cb15528db89e64e42508d3b
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Dec 24 10:36:58 2025 +0800

    [#2701] fix(server): release the memory of duplicate blocks (#2702)
    
    ### What changes were proposed in this pull request?
    Release the memory of duplicate blocks.
    
    ### Why are the changes needed?
    The used_buffer_size is incorrect when duplicate blocks occur.
    Fix: #2701
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    UT
---
 .../uniffle/server/buffer/ShuffleBufferManager.java | 11 +++++++++++
 .../server/buffer/ShuffleBufferManagerTest.java     | 21 +++++++++++++++++++++
 2 files changed, 32 insertions(+)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 5cfbcb12f..ed9331291 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -290,6 +290,14 @@ public class ShuffleBufferManager {
     }
     if (!isPreAllocated) {
       updateUsedMemory(size);
+    } else {
+      // We need to release the memory that has been occupied by the duplicate 
block,
+      // because the EncodedLength of duplicate blocks will not be added to
+      // the EncodedLength of the ShuffleBuffer, so this part of memory will 
not
+      // be release when the ShuffleBuffer trigger flush.
+      if (spd.getTotalBlockEncodedLength() > size) {
+        releaseMemory(spd.getTotalBlockEncodedLength() - size, false, false);
+      }
     }
     if (appBlockSizeMetricEnabled) {
       Arrays.stream(spd.getBlockList())
@@ -561,6 +569,9 @@ public class ShuffleBufferManager {
 
   public void releaseMemory(
       long size, boolean isReleaseFlushMemory, boolean isReleasePreAllocation) 
{
+    if (size == 0) {
+      return;
+    }
     if (usedMemory.get() >= size) {
       usedMemory.addAndGet(-size);
     } else {
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 7b81926e0..7dbba94b2 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -874,4 +874,25 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
   protected ShuffleBuffer createShuffleBuffer() {
     return null;
   }
+
+  @Test
+  public void cacheDuplicateBlockTest() {
+    String appId = "cacheDuplicateBlockTest";
+    shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
+    ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
+    
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
+    int shuffleId = 1;
+
+    shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
+    ShufflePartitionedData data = createData(0, 16);
+    shuffleBufferManager.requireMemory(48, true);
+    StatusCode sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, 
true, data);
+    assertEquals(StatusCode.SUCCESS, sc);
+    assertEquals(48, shuffleBufferManager.getUsedMemory());
+    shuffleBufferManager.requireMemory(48, true);
+    sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, data);
+    assertEquals(StatusCode.SUCCESS, sc);
+    assertEquals(48, shuffleBufferManager.getUsedMemory());
+  }
 }

Reply via email to