This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 42933f479 [#1717] improvement: Flush a part of partitions if the
shuffle size too big (#1718)
42933f479 is described below
commit 42933f479e7ded8e40a5edd7bd967751bd01380f
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Jun 6 14:28:58 2024 +0800
[#1717] improvement: Flush a part of partitions if the shuffle size too big
(#1718)
### What changes were proposed in this pull request?
Flush a part of partitions if the shuffle size too bigger
### Why are the changes needed?
For better performance.
Fix: #1717
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UTs
---
.../uniffle/server/buffer/ShuffleBufferManager.java | 16 +++++++++++++---
.../uniffle/server/buffer/ShuffleBufferManagerTest.java | 17 ++++++++++++++---
2 files changed, 27 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index fd78d898d..6ec8c13e6 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -486,8 +486,12 @@ public class ShuffleBufferManager {
}
}
- // flush the buffer with required map which is <appId -> shuffleId>
- public synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
+ // Flush the buffer with required map which is <appId -> shuffleId>.
+ // If the total size of the shuffles picked is bigger than the expected
flush size,
+ // it will just flush a part of partitions.
+ private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
+ long pickedFlushSize = 0L;
+ long expectedFlushSize = highWaterMark - lowWaterMark;
for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>>
appIdToBuffers :
bufferPool.entrySet()) {
String appId = appIdToBuffers.getKey();
@@ -500,13 +504,19 @@ public class ShuffleBufferManager {
for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
Range<Integer> range = rangeEntry.getKey();
+ ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
+ pickedFlushSize += shuffleBuffer.getSize();
flushBuffer(
- rangeEntry.getValue(),
+ shuffleBuffer,
appId,
shuffleId,
range.lowerEndpoint(),
range.upperEndpoint(),
isHugePartition(appId, shuffleId, range.lowerEndpoint()));
+ if (pickedFlushSize > expectedFlushSize) {
+ LOG.info("Already picked enough buffers to flush {} bytes",
pickedFlushSize);
+ return;
+ }
}
}
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 56d218980..8e958b22b 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -447,10 +447,11 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(6, 64));
assertEquals(384, shuffleBufferManager.getUsedMemory());
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(8, 64));
- waitForFlush(shuffleFlushManager, appId, shuffleId, 5);
- assertEquals(0, shuffleBufferManager.getUsedMemory());
+ waitForFlush(shuffleFlushManager, appId, shuffleId, 4, 96);
assertEquals(0, shuffleBufferManager.getInFlushSize());
+ shuffleBufferManager.removeBuffer(appId);
+ shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
shuffleBufferManager.registerBuffer("bufferSizeTest1", shuffleId, 0, 1);
shuffleBufferManager.cacheShuffleData(appId, shuffleId, false,
createData(0, 32));
assertEquals(64, shuffleBufferManager.getUsedMemory());
@@ -630,6 +631,16 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
private void waitForFlush(
ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId,
int expectedBlockNum)
throws Exception {
+ waitForFlush(shuffleFlushManager, appId, shuffleId, expectedBlockNum, 0);
+ }
+
+ private void waitForFlush(
+ ShuffleFlushManager shuffleFlushManager,
+ String appId,
+ int shuffleId,
+ int expectedBlockNum,
+ long expectedUsedMemory)
+ throws Exception {
int retry = 0;
long committedCount = 0;
do {
@@ -649,7 +660,7 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
// `shuffleBufferManager.getUsedMemory()` and
`shuffleBufferManager.getInFlushSize()`.
Awaitility.await()
.atMost(Duration.ofSeconds(5))
- .until(() -> shuffleBufferManager.getUsedMemory() == 0);
+ .until(() -> shuffleBufferManager.getUsedMemory() ==
expectedUsedMemory);
}
@Test