This is an automated email from the ASF dual-hosted git repository.
xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 945c08011 [MINOR] feat(server): Introduce in flush block count metrics
in buffer pool (#2183)
945c08011 is described below
commit 945c0801104713983e505494f71f0bce8e2a39e0
Author: maobaolong <[email protected]>
AuthorDate: Tue Oct 15 16:12:13 2024 +0800
[MINOR] feat(server): Introduce in flush block count metrics in buffer pool
(#2183)
### What changes were proposed in this pull request?
Introduce in flush block count metrics in buffer pool
### Why are the changes needed?
Insight all blocks in server include buffer pool and in flush blocks.
### Does this PR introduce any user-facing change?
New metrics named in_flush_block_count_in_buffer_pool
### How was this patch tested?
Test locally.
---
.../org/apache/uniffle/server/ShuffleServerMetrics.java | 2 ++
.../org/apache/uniffle/server/buffer/ShuffleBuffer.java | 2 ++
.../uniffle/server/buffer/ShuffleBufferManager.java | 16 +++++++++++++---
.../server/buffer/ShuffleBufferWithLinkedList.java | 5 +++++
.../uniffle/server/buffer/ShuffleBufferWithSkipList.java | 5 +++++
5 files changed, 27 insertions(+), 3 deletions(-)
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index c24e1b7b7..ebe8b0451 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -160,6 +160,8 @@ public class ShuffleServerMetrics {
public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";
+ public static final String IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL
+ = "in_flush_block_count_in_buffer_pool";
public static final String BLOCK_COUNT_IN_BUFFER_POOL =
"block_count_in_buffer_pool";
public static final String BUFFER_COUNT_IN_BUFFER_POOL =
"buffer_count_in_buffer_pool";
public static final String SHUFFLE_COUNT_IN_BUFFER_POOL =
"shuffle_count_in_buffer_pool";
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 890561bbd..522f9a057 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -59,6 +59,8 @@ public interface ShuffleBuffer {
int getBlockCount();
+ long getInFlushBlockCount();
+
long release();
void clearInFlushBuffer(long eventId);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 7ad5e40ab..58ce7665b 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -56,6 +56,7 @@ import org.apache.uniffle.server.ShuffleTaskManager;
import static
org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
import static
org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL;
import static
org.apache.uniffle.server.ShuffleServerMetrics.SHUFFLE_COUNT_IN_BUFFER_POOL;
public class ShuffleBufferManager {
@@ -152,7 +153,16 @@ public class ShuffleBufferManager {
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
.flatMap(rangeMap ->
rangeMap.asMapOfRanges().values().stream())
- .mapToInt(shuffleBuffer -> shuffleBuffer.getBlockCount())
+ .mapToLong(shuffleBuffer -> shuffleBuffer.getBlockCount())
+ .sum(),
+ 2 * 60 * 1000L /* 2 minutes */);
+ ShuffleServerMetrics.addLabeledCacheGauge(
+ IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL,
+ () ->
+ bufferPool.values().stream()
+ .flatMap(innerMap -> innerMap.values().stream())
+ .flatMap(rangeMap ->
rangeMap.asMapOfRanges().values().stream())
+ .mapToLong(shuffleBuffer ->
shuffleBuffer.getInFlushBlockCount())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledCacheGauge(
@@ -160,12 +170,12 @@ public class ShuffleBufferManager {
() ->
bufferPool.values().stream()
.flatMap(innerMap -> innerMap.values().stream())
- .mapToInt(rangeMap -> rangeMap.asMapOfRanges().size())
+ .mapToLong(rangeMap -> rangeMap.asMapOfRanges().size())
.sum(),
2 * 60 * 1000L /* 2 minutes */);
ShuffleServerMetrics.addLabeledGauge(
SHUFFLE_COUNT_IN_BUFFER_POOL,
- () -> bufferPool.values().stream().mapToInt(innerMap ->
innerMap.size()).sum());
+ () -> bufferPool.values().stream().mapToLong(innerMap ->
innerMap.size()).sum());
}
public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index aa4cc2010..7a95ac5d0 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -119,6 +119,11 @@ public class ShuffleBufferWithLinkedList extends
AbstractShuffleBuffer {
return getBlocks().size();
}
+ @Override
+ public long getInFlushBlockCount() {
+ return inFlushBlockMap.values().stream().mapToLong(Set::size).sum();
+ }
+
@Override
public synchronized long release() {
Throwable lastException = null;
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 57d2b1982..d7a748903 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -120,6 +120,11 @@ public class ShuffleBufferWithSkipList extends
AbstractShuffleBuffer {
return blockCount;
}
+ @Override
+ public long getInFlushBlockCount() {
+ return inFlushBlockMap.values().stream().mapToLong(Map::size).sum();
+ }
+
@Override
public synchronized long release() {
Throwable lastException = null;