This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 7867d59b1 [#2701] fix(server): release the memory of duplicate blocks
(#2702)
7867d59b1 is described below
commit 7867d59b1b15d1021cb15528db89e64e42508d3b
Author: xianjingfeng <[email protected]>
AuthorDate: Wed Dec 24 10:36:58 2025 +0800
[#2701] fix(server): release the memory of duplicate blocks (#2702)
### What changes were proposed in this pull request?
Release the memory of duplicate blocks.
### Why are the changes needed?
The used_buffer_size is incorrect when duplicate blocks occur.
Fix: #2701
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
UT
---
.../uniffle/server/buffer/ShuffleBufferManager.java | 11 +++++++++++
.../server/buffer/ShuffleBufferManagerTest.java | 21 +++++++++++++++++++++
2 files changed, 32 insertions(+)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 5cfbcb12f..ed9331291 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -290,6 +290,14 @@ public class ShuffleBufferManager {
}
if (!isPreAllocated) {
updateUsedMemory(size);
+ } else {
+ // We need to release the memory that has been occupied by the duplicate
block,
+ // because the EncodedLength of duplicate blocks will not be added to
+ // the EncodedLength of the ShuffleBuffer, so this part of memory will
not
+ // be release when the ShuffleBuffer trigger flush.
+ if (spd.getTotalBlockEncodedLength() > size) {
+ releaseMemory(spd.getTotalBlockEncodedLength() - size, false, false);
+ }
}
if (appBlockSizeMetricEnabled) {
Arrays.stream(spd.getBlockList())
@@ -561,6 +569,9 @@ public class ShuffleBufferManager {
public void releaseMemory(
long size, boolean isReleaseFlushMemory, boolean isReleasePreAllocation)
{
+ if (size == 0) {
+ return;
+ }
if (usedMemory.get() >= size) {
usedMemory.addAndGet(-size);
} else {
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 7b81926e0..7dbba94b2 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -874,4 +874,25 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
protected ShuffleBuffer createShuffleBuffer() {
return null;
}
+
+ @Test
+ public void cacheDuplicateBlockTest() {
+ String appId = "cacheDuplicateBlockTest";
+ shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
+
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
+ int shuffleId = 1;
+
+ shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
+ ShufflePartitionedData data = createData(0, 16);
+ shuffleBufferManager.requireMemory(48, true);
+ StatusCode sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId,
true, data);
+ assertEquals(StatusCode.SUCCESS, sc);
+ assertEquals(48, shuffleBufferManager.getUsedMemory());
+ shuffleBufferManager.requireMemory(48, true);
+ sc = shuffleBufferManager.cacheShuffleData(appId, shuffleId, true, data);
+ assertEquals(StatusCode.SUCCESS, sc);
+ assertEquals(48, shuffleBufferManager.getUsedMemory());
+ }
}