This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 42933f479 [#1717] improvement: Flush a part of partitions if the 
shuffle size too big (#1718)
42933f479 is described below

commit 42933f479e7ded8e40a5edd7bd967751bd01380f
Author: xianjingfeng <[email protected]>
AuthorDate: Thu Jun 6 14:28:58 2024 +0800

    [#1717] improvement: Flush a part of partitions if the shuffle size too big 
(#1718)
    
    ### What changes were proposed in this pull request?
    Flush a part of partitions if the shuffle size too bigger
    
    ### Why are the changes needed?
    For better performance.
    Fix: #1717
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Existing UTs
---
 .../uniffle/server/buffer/ShuffleBufferManager.java     | 16 +++++++++++++---
 .../uniffle/server/buffer/ShuffleBufferManagerTest.java | 17 ++++++++++++++---
 2 files changed, 27 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index fd78d898d..6ec8c13e6 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -486,8 +486,12 @@ public class ShuffleBufferManager {
     }
   }
 
-  // flush the buffer with required map which is <appId -> shuffleId>
-  public synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
+  // Flush the buffer with required map which is <appId -> shuffleId>.
+  // If the total size of the shuffles picked is bigger than the expected 
flush size,
+  // it will just flush a part of partitions.
+  private synchronized void flush(Map<String, Set<Integer>> requiredFlush) {
+    long pickedFlushSize = 0L;
+    long expectedFlushSize = highWaterMark - lowWaterMark;
     for (Map.Entry<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> 
appIdToBuffers :
         bufferPool.entrySet()) {
       String appId = appIdToBuffers.getKey();
@@ -500,13 +504,19 @@ public class ShuffleBufferManager {
             for (Map.Entry<Range<Integer>, ShuffleBuffer> rangeEntry :
                 shuffleIdToBuffers.getValue().asMapOfRanges().entrySet()) {
               Range<Integer> range = rangeEntry.getKey();
+              ShuffleBuffer shuffleBuffer = rangeEntry.getValue();
+              pickedFlushSize += shuffleBuffer.getSize();
               flushBuffer(
-                  rangeEntry.getValue(),
+                  shuffleBuffer,
                   appId,
                   shuffleId,
                   range.lowerEndpoint(),
                   range.upperEndpoint(),
                   isHugePartition(appId, shuffleId, range.lowerEndpoint()));
+              if (pickedFlushSize > expectedFlushSize) {
+                LOG.info("Already picked enough buffers to flush {} bytes", 
pickedFlushSize);
+                return;
+              }
             }
           }
         }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 56d218980..8e958b22b 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -447,10 +447,11 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(6, 64));
     assertEquals(384, shuffleBufferManager.getUsedMemory());
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(8, 64));
-    waitForFlush(shuffleFlushManager, appId, shuffleId, 5);
-    assertEquals(0, shuffleBufferManager.getUsedMemory());
+    waitForFlush(shuffleFlushManager, appId, shuffleId, 4, 96);
     assertEquals(0, shuffleBufferManager.getInFlushSize());
 
+    shuffleBufferManager.removeBuffer(appId);
+    shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
     shuffleBufferManager.registerBuffer("bufferSizeTest1", shuffleId, 0, 1);
     shuffleBufferManager.cacheShuffleData(appId, shuffleId, false, 
createData(0, 32));
     assertEquals(64, shuffleBufferManager.getUsedMemory());
@@ -630,6 +631,16 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
   private void waitForFlush(
       ShuffleFlushManager shuffleFlushManager, String appId, int shuffleId, 
int expectedBlockNum)
       throws Exception {
+    waitForFlush(shuffleFlushManager, appId, shuffleId, expectedBlockNum, 0);
+  }
+
+  private void waitForFlush(
+      ShuffleFlushManager shuffleFlushManager,
+      String appId,
+      int shuffleId,
+      int expectedBlockNum,
+      long expectedUsedMemory)
+      throws Exception {
     int retry = 0;
     long committedCount = 0;
     do {
@@ -649,7 +660,7 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     // `shuffleBufferManager.getUsedMemory()` and 
`shuffleBufferManager.getInFlushSize()`.
     Awaitility.await()
         .atMost(Duration.ofSeconds(5))
-        .until(() -> shuffleBufferManager.getUsedMemory() == 0);
+        .until(() -> shuffleBufferManager.getUsedMemory() == 
expectedUsedMemory);
   }
 
   @Test

Reply via email to