This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 49225d820 [#2130] fix(server): Potential huge partition limit invalid
when flushing is slow (#2131)
49225d820 is described below
commit 49225d8205c791fe5b8ca933916eaa96abbf5bc7
Author: Junfan Zhang <[email protected]>
AuthorDate: Fri Sep 20 13:20:58 2024 +0800
[#2130] fix(server): Potential huge partition limit invalid when flushing
is slow (#2131)
### What changes were proposed in this pull request?
fix potential huge partition limit invalid when flushing is slow
### Why are the changes needed?
Fix: #2130
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Unit tests
Co-authored-by: Junfan Zhang <[email protected]>
---
.../apache/uniffle/server/HugePartitionUtils.java | 9 ++++-----
.../apache/uniffle/server/ShuffleTaskManager.java | 8 +++++++-
.../server/buffer/AbstractShuffleBuffer.java | 8 ++++++++
.../uniffle/server/buffer/ShuffleBuffer.java | 2 ++
.../server/buffer/ShuffleBufferManager.java | 5 +++++
.../server/buffer/ShuffleBufferWithLinkedList.java | 2 ++
.../server/buffer/ShuffleBufferWithSkipList.java | 2 ++
.../uniffle/server/ShuffleTaskManagerTest.java | 23 ++++++++++++++++++++++
8 files changed, 53 insertions(+), 6 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
index 1db5c349c..3d7099c8b 100644
--- a/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
+++ b/server/src/main/java/org/apache/uniffle/server/HugePartitionUtils.java
@@ -21,6 +21,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import
org.apache.uniffle.common.exception.ExceedHugePartitionHardLimitException;
+import org.apache.uniffle.server.buffer.ShuffleBuffer;
import org.apache.uniffle.server.buffer.ShuffleBufferManager;
/** Huge partition utils. */
@@ -130,11 +131,9 @@ public class HugePartitionUtils {
int partitionId,
long usedPartitionDataSize) {
if (usedPartitionDataSize >
shuffleBufferManager.getHugePartitionSizeThreshold()) {
- long memoryUsed =
- shuffleBufferManager
- .getShuffleBufferEntry(appId, shuffleId, partitionId)
- .getValue()
- .getSize();
+ ShuffleBuffer buffer =
+ shuffleBufferManager.getShuffleBufferEntry(appId, shuffleId,
partitionId).getValue();
+ long memoryUsed = buffer.getInFlushSize() + buffer.getSize();
if (memoryUsed > shuffleBufferManager.getHugePartitionMemoryLimitSize())
{
LOG.warn(
"AppId: {}, shuffleId: {}, partitionId: {}, memory used: {}, "
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
index d5e747071..07509d555 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleTaskManager.java
@@ -93,7 +93,7 @@ import static
org.apache.uniffle.server.ShuffleServerMetrics.REQUIRE_BUFFER_COUN
public class ShuffleTaskManager {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleTaskManager.class);
- private final ShuffleFlushManager shuffleFlushManager;
+ private ShuffleFlushManager shuffleFlushManager;
private final ScheduledExecutorService scheduledExecutorService;
private final ScheduledExecutorService expiredAppCleanupExecutorService;
private final ScheduledExecutorService leakShuffleDataCheckExecutorService;
@@ -1033,4 +1033,10 @@ public class ShuffleTaskManager {
protected void setStorageManager(StorageManager storageManager) {
this.storageManager = storageManager;
}
+
+ // only for tests
+ @VisibleForTesting
+ protected void setShuffleFlushManager(ShuffleFlushManager flushManager) {
+ this.shuffleFlushManager = flushManager;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
index 3f4549c7b..62e2728bf 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/AbstractShuffleBuffer.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.server.buffer;
import java.util.Collection;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import com.google.common.collect.Lists;
@@ -41,6 +42,8 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
protected long size;
+ protected AtomicLong inFlushSize = new AtomicLong();
+
public AbstractShuffleBuffer() {
this.size = 0;
}
@@ -178,4 +181,9 @@ public abstract class AbstractShuffleBuffer implements
ShuffleBuffer {
}
}
}
+
+ @Override
+ public long getInFlushSize() {
+ return inFlushSize.get();
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 1b29e3b41..bc3d5a552 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -52,6 +52,8 @@ public interface ShuffleBuffer {
long getSize();
+ long getInFlushSize();
+
/** Only for test */
List<ShufflePartitionedBlock> getBlocks();
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 8cade02ef..39002a0dc 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -757,4 +757,9 @@ public class ShuffleBufferManager {
public void setUsedMemory(long usedMemory) {
this.usedMemory.set(usedMemory);
}
+
+ @VisibleForTesting
+ public void setBufferFlushThreshold(long bufferFlushThreshold) {
+ this.bufferFlushThreshold = bufferFlushThreshold;
+ }
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index a9e8ddc1a..b50d9fb90 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -92,9 +92,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
() -> {
this.clearInFlushBuffer(event.getEventId());
spBlocks.forEach(spb -> spb.getData().release());
+ inFlushSize.addAndGet(-event.getSize());
});
inFlushBlockMap.put(eventId, inFlushedQueueBlocks);
blocks.clear();
+ inFlushSize.addAndGet(size);
size = 0;
return event;
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 960ab94f5..5d91e9e01 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -92,10 +92,12 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
() -> {
this.clearInFlushBuffer(event.getEventId());
spBlocks.forEach(spb -> spb.getData().release());
+ inFlushSize.addAndGet(-event.getSize());
});
inFlushBlockMap.put(eventId, blocksMap);
blocksMap = newConcurrentSkipListMap();
blockCount = 0;
+ inFlushSize.addAndGet(size);
size = 0;
return event;
}
diff --git
a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
index 5188d4706..da343a902 100644
--- a/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
+++ b/server/src/test/java/org/apache/uniffle/server/ShuffleTaskManagerTest.java
@@ -238,6 +238,29 @@ public class ShuffleTaskManagerTest extends HadoopTestBase
{
assertTrue(e instanceof NoBufferForHugePartitionException);
}
+ // test: to simulate the flush delay, it also will limit the huge
partition if the partial data
+ // is in flush queue.
+ ShuffleFlushManager shuffleFlushManager =
shuffleServer.getShuffleFlushManager();
+ ShuffleFlushManager spyFlushManager = spy(shuffleFlushManager);
+ doAnswer(
+ invocationOnMock -> {
+ Thread.sleep(10000);
+ return invocationOnMock.callRealMethod();
+ })
+ .when(spyFlushManager)
+ .addToFlushQueue(any(ShuffleDataFlushEvent.class));
+ shuffleTaskManager.setShuffleFlushManager(spyFlushManager);
+ shuffleServer.getShuffleBufferManager().setBufferFlushThreshold(1024);
+ partitionedData0 = createPartitionedData(1, 1, 500);
+ shuffleTaskManager.cacheShuffleData(appId, shuffleId, true,
partitionedData0);
+ shuffleTaskManager.updateCachedBlockIds(appId, shuffleId, 1,
partitionedData0.getBlockList());
+ try {
+ shuffleTaskManager.requireBuffer(appId, 1, Arrays.asList(1),
Arrays.asList(500), 500);
+ fail("Should throw NoBufferForHugePartitionException");
+ } catch (Exception e) {
+ assertTrue(e instanceof NoBufferForHugePartitionException);
+ }
+
// metrics test
assertEquals(0,
ShuffleServerMetrics.counterTotalRequireBufferFailedForHugePartition.get());
assertEquals(0,
ShuffleServerMetrics.counterTotalRequireBufferFailedForRegularPartition.get());