This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 49225d820 [#2130] fix(server): Potential huge partition limit invalid 
when flushing is slow (#2131)
49225d820 is described below

commit 49225d8205c791fe5b8ca933916eaa96abbf5bc7
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Sep 20 13:20:58 2024 +0800

    [#2130] fix(server): Potential huge partition limit invalid when flushing 
is slow (#2131)
    
    ### What changes were proposed in this pull request?
    
    fix potential huge partition limit invalid when flushing is slow
    
    ### Why are the changes needed?
    
    Fix: #2130
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Unit tests
    
    
    Co-authored-by: Junfan Zhang <[email protected]>
---
 .../apache/uniffle/server/HugePartitionUtils.java  |  9 ++++-----
 .../apache/uniffle/server/ShuffleTaskManager.java  |  8 +++++++-
 .../server/buffer/AbstractShuffleBuffer.java       |  8 ++++++++
 .../uniffle/server/buffer/ShuffleBuffer.java       |  2 ++
 .../server/buffer/ShuffleBufferManager.java        |  5 +++++
 .../server/buffer/ShuffleBufferWithLinkedList.java |  2 ++
 .../server/buffer/ShuffleBufferWithSkipList.java   |  2 ++
 .../uniffle/server/ShuffleTaskManagerTest.java     | 23 ++++++++++++++++++++++
 8 files changed, 53 insertions(+), 6 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java 
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
index 1db5c349c..3d7099c8b 100644
--- a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import 
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
 import org.apache.uniffle.server.buffer.ShuffleBufferManager;
 
 /** Huge partition utils. */
@@ -130,11 +131,9 @@ public class HugePartitionUtils {
       int partitionId,
       long usedPartitionDataSize) {
     if (usedPartitionDataSize > 
shuffleBufferManager.getHugePartitionSizeThreshold()) {
-      long memoryUsed =
-          shuffleBufferManager
-              .getShuffleBufferEntry(appId, shuffleId, partitionId)
-              .getValue()
-              .getSize();
+      ShuffleBuffer buffer =
+          shuffleBufferManager.getShuffleBufferEntry(appId, shuffleId, 
partitionId).getValue();
+      long memoryUsed = buffer.getInFlushSize() + buffer.getSize();
       if (memoryUsed > shuffleBufferManager.getHugePartitionMemoryLimitSize()) 
{
         LOG.warn(
             "AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index d5e747071..07509d555 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -93,7 +93,7 @@ import static 
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUN
 public class ShuffleTaskManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleTaskManager.class);
-  private final ShuffleFlushManager shuffleFlushManager;
+  private ShuffleFlushManager shuffleFlushManager;
   private final ScheduledExecutorService scheduledExecutorService;
   private final ScheduledExecutorService expiredAppCleanupExecutorService;
   private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
@@ -1033,4 +1033,10 @@ public class ShuffleTaskManager {
   protected void setStorageManager(StorageManager storageManager) {
     this.storageManager = storageManager;
   }
+
+  // only for tests
+  @VisibleForTesting
+  protected void setShuffleFlushManager(ShuffleFlushManager flushManager) {
+    this.shuffleFlushManager = flushManager;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 3f4549c7b..62e2728bf 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.buffer;
 
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 import com.google.common.collect.Lists;
@@ -41,6 +42,8 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
 
   protected long size;
 
+  protected AtomicLong inFlushSize = new AtomicLong();
+
   public AbstractShuffleBuffer() {
     this.size = 0;
   }
@@ -178,4 +181,9 @@ public abstract class AbstractShuffleBuffer implements 
ShuffleBuffer {
       }
     }
   }
+
+  @Override
+  public long getInFlushSize() {
+    return inFlushSize.get();
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 1b29e3b41..bc3d5a552 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -52,6 +52,8 @@ public interface ShuffleBuffer {
 
   long getSize();
 
+  long getInFlushSize();
+
   /** Only for test */
   List<ShufflePartitionedBlock> getBlocks();
 
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 8cade02ef..39002a0dc 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -757,4 +757,9 @@ public class ShuffleBufferManager {
   public void setUsedMemory(long usedMemory) {
     this.usedMemory.set(usedMemory);
   }
+
+  @VisibleForTesting
+  public void setBufferFlushThreshold(long bufferFlushThreshold) {
+    this.bufferFlushThreshold = bufferFlushThreshold;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index a9e8ddc1a..b50d9fb90 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -92,9 +92,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
         () -> {
           this.clearInFlushBuffer(event.getEventId());
           spBlocks.forEach(spb -> spb.getData().release());
+          inFlushSize.addAndGet(-event.getSize());
         });
     inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
     blocks.clear();
+    inFlushSize.addAndGet(size);
     size = 0;
     return event;
   }
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 960ab94f5..5d91e9e01 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -92,10 +92,12 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
         () -> {
           this.clearInFlushBuffer(event.getEventId());
           spBlocks.forEach(spb -> spb.getData().release());
+          inFlushSize.addAndGet(-event.getSize());
         });
     inFlushBlockMap.put(eventId, blocksMap);
     blocksMap = newConcurrentSkipListMap();
     blockCount = 0;
+    inFlushSize.addAndGet(size);
     size = 0;
     return event;
   }
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 5188d4706..da343a902 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -238,6 +238,29 @@ public class ShuffleTaskManagerTest extends HadoopTestBase 
{
       assertTrue(e instanceof NoBufferForHugePartitionException);
     }
 
+    // test: to simulate the flush delay, it also will limit the huge 
partition if the partial data
+    // is in flush queue.
+    ShuffleFlushManager shuffleFlushManager = 
shuffleServer.getShuffleFlushManager();
+    ShuffleFlushManager spyFlushManager = spy(shuffleFlushManager);
+    doAnswer(
+            invocationOnMock -> {
+              Thread.sleep(10000);
+              return invocationOnMock.callRealMethod();
+            })
+        .when(spyFlushManager)
+        .addToFlushQueue(any(ShuffleDataFlushEvent.class));
+    shuffleTaskManager.setShuffleFlushManager(spyFlushManager);
+    shuffleServer.getShuffleBufferManager().setBufferFlushThreshold(1024);
+    partitionedData0 = createPartitionedData(1, 1, 500);
+    shuffleTaskManager.cacheShuffleData(appId, shuffleId, true, 
partitionedData0);
+    shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1, 
partitionedData0.getBlockList());
+    try {
+      shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1), 
Arrays.asList(500), 500);
+      fail("Should throw NoBufferForHugePartitionException");
+    } catch (Exception e) {
+      assertTrue(e instanceof NoBufferForHugePartitionException);
+    }
+
     // metrics test
     assertEquals(0, 
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.get());
     assertEquals(0, 
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.get());

Reply via email to