This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 6599ef27e [#1809] fix(server): add block counter for skip-list buffer
(#1846)
6599ef27e is described below
commit 6599ef27e257d117ffb01855cf0d2e8351b90eec
Author: xianjingfeng <[email protected]>
AuthorDate: Tue Jul 2 15:54:56 2024 +0800
[#1809] fix(server): add block counter for skip-list buffer (#1846)
### What changes were proposed in this pull request?
Add block counter for SKIP_LIST buffer.
### Why are the changes needed?
The performance of getting the size of skip-list is not good.
Fix: #1809
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Existing UT.
---
.../main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java | 2 ++
.../org/apache/uniffle/server/buffer/ShuffleBufferManager.java | 4 ++--
.../apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java | 5 +++++
.../apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java | 8 ++++++++
4 files changed, 17 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index cb01bb0a9..1884a50c0 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -57,6 +57,8 @@ public interface ShuffleBuffer {
/** Only for test */
List<ShufflePartitionedBlock> getBlocks();
+ int getBlockCount();
+
void release();
void clearInFlushBuffer(long eventId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index d17716daf..c62ebd8c5 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -278,7 +278,7 @@ public class ShuffleBufferManager {
// than rss.server.flush.cold.storage.threshold.size, otherwise cold
storage will be useless.
if ((isHugePartition || this.bufferFlushEnabled)
&& (buffer.getSize() > this.bufferFlushThreshold
- || buffer.getBlocks().size() > bufferFlushBlocksNumThreshold)) {
+ || buffer.getBlockCount() > bufferFlushBlocksNumThreshold)) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Start to flush single buffer. Details - shuffleId:{},
startPartition:{}, endPartition:{}, isHugePartition:{}, bufferSize:{},
blocksNum:{}",
@@ -287,7 +287,7 @@ public class ShuffleBufferManager {
endPartition,
isHugePartition,
buffer.getSize(),
- buffer.getBlocks().size());
+ buffer.getBlockCount());
}
flushBuffer(buffer, appId, shuffleId, startPartition, endPartition,
isHugePartition);
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index c01bb0823..d37dc446f 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -105,6 +105,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
return blocks;
}
+ @Override
+ public int getBlockCount() {
+ return getBlocks().size();
+ }
+
@Override
public void release() {
blocks.forEach(spb -> spb.getData().release());
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 4783b4f56..db0ce2543 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -42,6 +42,7 @@ import org.apache.uniffle.server.ShuffleFlushManager;
public class ShuffleBufferWithSkipList extends AbstractShuffleBuffer {
private ConcurrentSkipListMap<Long, ShufflePartitionedBlock> blocksMap;
private final Map<Long, ConcurrentSkipListMap<Long,
ShufflePartitionedBlock>> inFlushBlockMap;
+ private int blockCount;
public ShuffleBufferWithSkipList(long capacity) {
super(capacity);
@@ -63,6 +64,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
synchronized (this) {
for (ShufflePartitionedBlock block : data.getBlockList()) {
blocksMap.put(block.getBlockId(), block);
+ blockCount++;
mSize += block.getSize();
}
size += mSize;
@@ -94,6 +96,7 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
});
inFlushBlockMap.put(eventId, blocksMap);
blocksMap = newConcurrentSkipListMap();
+ blockCount = 0;
size = 0;
return event;
}
@@ -103,6 +106,11 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
return new LinkedList<>(blocksMap.values());
}
+ @Override
+ public int getBlockCount() {
+ return blockCount;
+ }
+
@Override
public void release() {
blocksMap.values().forEach(spb -> spb.getData().release());