This is an automated email from the ASF dual-hosted git repository.

xianjingfeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 945c08011 [MINOR] feat(server): Introduce in flush block count metrics 
in buffer pool (#2183)
945c08011 is described below

commit 945c0801104713983e505494f71f0bce8e2a39e0
Author: maobaolong <[email protected]>
AuthorDate: Tue Oct 15 16:12:13 2024 +0800

    [MINOR] feat(server): Introduce in flush block count metrics in buffer pool 
(#2183)
    
    ### What changes were proposed in this pull request?
    Introduce in flush block count metrics in buffer pool
    
    ### Why are the changes needed?
    Insight all blocks in server include buffer pool and in flush blocks.
    
    ### Does this PR introduce any user-facing change?
    New metrics named in_flush_block_count_in_buffer_pool
    ### How was this patch tested?
    Test locally.
---
 .../org/apache/uniffle/server/ShuffleServerMetrics.java  |  2 ++
 .../org/apache/uniffle/server/buffer/ShuffleBuffer.java  |  2 ++
 .../uniffle/server/buffer/ShuffleBufferManager.java      | 16 +++++++++++++---
 .../server/buffer/ShuffleBufferWithLinkedList.java       |  5 +++++
 .../uniffle/server/buffer/ShuffleBufferWithSkipList.java |  5 +++++
 5 files changed, 27 insertions(+), 3 deletions(-)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index c24e1b7b7..ebe8b0451 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -160,6 +160,8 @@ public class ShuffleServerMetrics {
 
   public static final String REQUIRE_BUFFER_COUNT = "require_buffer_count";
 
+  public static final String IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL
+      = "in_flush_block_count_in_buffer_pool";
   public static final String BLOCK_COUNT_IN_BUFFER_POOL = 
"block_count_in_buffer_pool";
   public static final String BUFFER_COUNT_IN_BUFFER_POOL = 
"buffer_count_in_buffer_pool";
   public static final String SHUFFLE_COUNT_IN_BUFFER_POOL = 
"shuffle_count_in_buffer_pool";
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
index 890561bbd..522f9a057 100644
--- a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
+++ b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBuffer.java
@@ -59,6 +59,8 @@ public interface ShuffleBuffer {
 
   int getBlockCount();
 
+  long getInFlushBlockCount();
+
   long release();
 
   void clearInFlushBuffer(long eventId);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 7ad5e40ab..58ce7665b 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -56,6 +56,7 @@ import org.apache.uniffle.server.ShuffleTaskManager;
 
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.BLOCK_COUNT_IN_BUFFER_POOL;
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.BUFFER_COUNT_IN_BUFFER_POOL;
+import static 
org.apache.uniffle.server.ShuffleServerMetrics.IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL;
 import static 
org.apache.uniffle.server.ShuffleServerMetrics.SHUFFLE_COUNT_IN_BUFFER_POOL;
 
 public class ShuffleBufferManager {
@@ -152,7 +153,16 @@ public class ShuffleBufferManager {
             bufferPool.values().stream()
                 .flatMap(innerMap -> innerMap.values().stream())
                 .flatMap(rangeMap -> 
rangeMap.asMapOfRanges().values().stream())
-                .mapToInt(shuffleBuffer -> shuffleBuffer.getBlockCount())
+                .mapToLong(shuffleBuffer -> shuffleBuffer.getBlockCount())
+                .sum(),
+        2 * 60 * 1000L /* 2 minutes */);
+    ShuffleServerMetrics.addLabeledCacheGauge(
+        IN_FLUSH_BLOCK_COUNT_IN_BUFFER_POOL,
+        () ->
+            bufferPool.values().stream()
+                .flatMap(innerMap -> innerMap.values().stream())
+                .flatMap(rangeMap -> 
rangeMap.asMapOfRanges().values().stream())
+                .mapToLong(shuffleBuffer -> 
shuffleBuffer.getInFlushBlockCount())
                 .sum(),
         2 * 60 * 1000L /* 2 minutes */);
     ShuffleServerMetrics.addLabeledCacheGauge(
@@ -160,12 +170,12 @@ public class ShuffleBufferManager {
         () ->
             bufferPool.values().stream()
                 .flatMap(innerMap -> innerMap.values().stream())
-                .mapToInt(rangeMap -> rangeMap.asMapOfRanges().size())
+                .mapToLong(rangeMap -> rangeMap.asMapOfRanges().size())
                 .sum(),
         2 * 60 * 1000L /* 2 minutes */);
     ShuffleServerMetrics.addLabeledGauge(
         SHUFFLE_COUNT_IN_BUFFER_POOL,
-        () -> bufferPool.values().stream().mapToInt(innerMap -> 
innerMap.size()).sum());
+        () -> bufferPool.values().stream().mapToLong(innerMap -> 
innerMap.size()).sum());
   }
 
   public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
index aa4cc2010..7a95ac5d0 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithLinkedList.java
@@ -119,6 +119,11 @@ public class ShuffleBufferWithLinkedList extends 
AbstractShuffleBuffer {
     return getBlocks().size();
   }
 
+  @Override
+  public long getInFlushBlockCount() {
+    return inFlushBlockMap.values().stream().mapToLong(Set::size).sum();
+  }
+
   @Override
   public synchronized long release() {
     Throwable lastException = null;
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
index 57d2b1982..d7a748903 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferWithSkipList.java
@@ -120,6 +120,11 @@ public class ShuffleBufferWithSkipList extends 
AbstractShuffleBuffer {
     return blockCount;
   }
 
+  @Override
+  public long getInFlushBlockCount() {
+    return inFlushBlockMap.values().stream().mapToLong(Map::size).sum();
+  }
+
   @Override
   public synchronized long release() {
     Throwable lastException = null;

Reply via email to