This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d2f1c8d [#1585] feat(server): Support app-level block size 
statistics to report metrics (#1593)
89d2f1c8d is described below

commit 89d2f1c8d08df264afd6eb8f7c956eecc8182e5a
Author: leslizhang <[email protected]>
AuthorDate: Tue Apr 30 19:30:56 2024 +0800

    [#1585] feat(server): Support app-level block size statistics to report 
metrics (#1593)
    
    ### What changes were proposed in this pull request?
    
    added shuffle block size metric of type histogram.
    
    ### Why are the changes needed?
    related  feature https://github.com/apache/incubator-uniffle/issues/1585
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    new UTs.
    
    Co-authored-by: leslizhang <[email protected]>
    Co-authored-by: Enrico Minack <[email protected]>
---
 .../spark/shuffle/writer/WriteBufferManager.java   |  9 +++++
 .../apache/uniffle/common/config/ConfigUtils.java  | 11 ++++++
 .../uniffle/common/metrics/MetricsManager.java     |  4 ++
 .../org/apache/uniffle/common/util/Constants.java  |  1 +
 .../org/apache/uniffle/server/ShuffleServer.java   |  2 +-
 .../apache/uniffle/server/ShuffleServerConf.java   | 14 +++++++
 .../uniffle/server/ShuffleServerMetrics.java       | 28 ++++++++++++--
 .../server/buffer/ShuffleBufferManager.java        | 15 ++++++++
 .../server/buffer/ShuffleBufferManagerTest.java    | 44 ++++++++++++++++++++++
 9 files changed, 123 insertions(+), 5 deletions(-)

diff --git 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index efe376a34..f5fa497bb 100644
--- 
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++ 
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -179,7 +179,14 @@ public class WriteBufferManager extends MemoryConsumer {
 
     // check buffer size > spill threshold
     if (usedBytes.get() - inSendListBytes.get() > spillSize) {
+      LOG.info(
+          "ShuffleBufferManager spill for buffer size exceeding spill 
threshold,"
+              + "usedBytes[{}],inSendListBytes[{}],spillSize[{}]",
+          usedBytes.get(),
+          inSendListBytes.get(),
+          spillSize);
       List<ShuffleBlockInfo> multiSendingBlocks = clear();
+
       multiSendingBlocks.addAll(singleOrEmptySendingBlocks);
       writeTime += System.currentTimeMillis() - start;
       return multiSendingBlocks;
@@ -316,6 +323,8 @@ public class WriteBufferManager extends MemoryConsumer {
             + dataSize
             + "], memoryUsed["
             + memoryUsed
+            + "], number of blocks["
+            + result.size()
             + "]");
     return result;
   }
diff --git 
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java 
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index 14777e8e0..64f0ee1d5 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -181,6 +181,17 @@ public class ConfigUtils {
     return Double.parseDouble(o.toString());
   }
 
+  public static double[] convertBytesStringToDoubleArray(Object o) {
+    if (o == null) {
+      return new double[0];
+    } else {
+      return Arrays.stream(o.toString().split(","))
+          .map(UnitConverter::byteStringAsBytes)
+          .mapToDouble(l -> (double) l)
+          .toArray();
+    }
+  }
+
   @SuppressWarnings("unchecked")
   public static List<ConfigOption<Object>> getAllConfigOptions(
       Class<? extends RssBaseConf> confClass) {
diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index 10b7542ac..e2ae3106c 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -83,6 +83,10 @@ public class MetricsManager {
     return addHistogram(name, "Histogram " + name, buckets, labels);
   }
 
+  public Histogram addHistogram(String name, double[] buckets) {
+    return addHistogram(name, "Histogram " + name, buckets, defaultLabelNames);
+  }
+
   public Histogram addHistogram(String name, String help, double[] buckets, 
String[] labels) {
     return Histogram.build()
         .name(name)
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java 
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 1f4dd0b06..ccaa51970 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -24,6 +24,7 @@ public final class Constants {
   // the value is used for client/server compatible, eg, online upgrade
   public static final String SHUFFLE_SERVER_VERSION = "ss_v5";
   public static final String METRICS_TAG_LABEL_NAME = "tags";
+  public static final String METRICS_APP_LABEL_NAME = "appId";
   public static final String COORDINATOR_TAG = "coordinator";
   public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
   public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index bab2bcf44..79fe35b76 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -316,7 +316,7 @@ public class ShuffleServer {
     LOG.info("Register metrics");
     CollectorRegistry shuffleServerCollectorRegistry = new 
CollectorRegistry(true);
     String rawTags = getEncodedTags();
-    ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags);
+    ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags, 
shuffleServerConf);
     grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf, 
rawTags);
     grpcMetrics.register(new CollectorRegistry(true));
     nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, rawTags);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a7d8254f4..088252779 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -572,6 +572,20 @@ public class ShuffleServerConf extends RssBaseConf {
           .withDescription(
               "keep alive time of thread pool that used for calc summary 
metric, in SECONDS.");
 
+  public static final ConfigOption<Boolean> 
APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED =
+      ConfigOptions.key("rss.server.metrics.blockSizeStatisticsEnabled")
+          .booleanType()
+          .defaultValue(false)
+          .withDescription("whether or not shuffle block size metric is 
enabled");
+
+  public static final ConfigOption<String> 
APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS =
+      ConfigOptions.key("rss.server.metrics.blockSizeStatistics.buckets")
+          .stringType()
+          .defaultValue("32kb,64kb,128kb,256kb,512kb,1mb,2mb,4mb,8mb,16mb")
+          .withDescription(
+              "A comma-separated block size list, where each value"
+                  + " can be suffixed with a memory size unit, such as kb or 
k, mb or m, etc.");
+
   public ShuffleServerConf() {}
 
   public ShuffleServerConf(String fileName) {
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index b4566b366..bee4dd48c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -24,18 +24,23 @@ import com.google.common.collect.Maps;
 import io.prometheus.client.CollectorRegistry;
 import io.prometheus.client.Counter;
 import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
 import io.prometheus.client.Summary;
 import org.apache.commons.lang3.StringUtils;
 
+import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.metrics.MetricsManager;
 import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.storage.common.LocalStorage;
 
+import static org.apache.uniffle.common.util.Constants.METRICS_APP_LABEL_NAME;
+
 public class ShuffleServerMetrics {
 
   private static final String TOTAL_RECEIVED_DATA = "total_received_data";
   private static final String TOTAL_WRITE_DATA = "total_write_data";
   private static final String TOTAL_WRITE_BLOCK = "total_write_block";
+  private static final String WRITE_BLOCK_SIZE = "write_block_size";
   private static final String TOTAL_WRITE_TIME = "total_write_time";
   private static final String TOTAL_WRITE_HANDLER = "total_write_handler";
   private static final String TOTAL_WRITE_EXCEPTION = "total_write_exception";
@@ -148,6 +153,7 @@ public class ShuffleServerMetrics {
   public static Counter.Child counterTotalReceivedDataSize;
   public static Counter.Child counterTotalWriteDataSize;
   public static Counter.Child counterTotalWriteBlockSize;
+  public static Histogram appHistogramWriteBlockSize;
   public static Counter.Child counterTotalWriteTime;
   public static Counter.Child counterWriteException;
   public static Counter.Child counterWriteSlow;
@@ -231,20 +237,28 @@ public class ShuffleServerMetrics {
   private static MetricsManager metricsManager;
   private static boolean isRegister = false;
 
-  public static synchronized void register(CollectorRegistry 
collectorRegistry, String tags) {
+  public static synchronized void register(
+      CollectorRegistry collectorRegistry, String tags, ShuffleServerConf 
serverConf) {
     if (!isRegister) {
       ShuffleServerMetrics.tags = tags;
       Map<String, String> labels = Maps.newHashMap();
       labels.put(Constants.METRICS_TAG_LABEL_NAME, ShuffleServerMetrics.tags);
       metricsManager = new MetricsManager(collectorRegistry, labels);
       isRegister = true;
-      setUpMetrics();
+      setUpMetrics(serverConf);
     }
   }
 
+  public static void register(ShuffleServerConf serverConf) {
+    register(CollectorRegistry.defaultRegistry, 
Constants.SHUFFLE_SERVER_VERSION, serverConf);
+  }
+
   @VisibleForTesting
   public static void register() {
-    register(CollectorRegistry.defaultRegistry, 
Constants.SHUFFLE_SERVER_VERSION);
+    register(
+        CollectorRegistry.defaultRegistry,
+        Constants.SHUFFLE_SERVER_VERSION,
+        new ShuffleServerConf());
   }
 
   @VisibleForTesting
@@ -314,10 +328,16 @@ public class ShuffleServerMetrics {
     incHadoopStorageWriteDataSize(storageHost, size, false);
   }
 
-  private static void setUpMetrics() {
+  private static void setUpMetrics(ShuffleServerConf serverConf) {
     counterTotalReceivedDataSize = 
metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
     counterTotalWriteDataSize = 
metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
     counterTotalWriteBlockSize = 
metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
+    appHistogramWriteBlockSize =
+        metricsManager.addHistogram(
+            WRITE_BLOCK_SIZE,
+            ConfigUtils.convertBytesStringToDoubleArray(
+                
serverConf.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS)),
+            METRICS_APP_LABEL_NAME);
     counterTotalWriteTime = metricsManager.addLabeledCounter(TOTAL_WRITE_TIME);
     counterWriteException = 
metricsManager.addLabeledCounter(TOTAL_WRITE_EXCEPTION);
     counterWriteSlow = metricsManager.addLabeledCounter(TOTAL_WRITE_SLOW);
diff --git 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 578b6c498..8c1e39cf2 100644
--- 
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++ 
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -17,6 +17,7 @@
 
 package org.apache.uniffle.server.buffer;
 
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
@@ -81,6 +82,7 @@ public class ShuffleBufferManager {
   protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>> 
bufferPool;
   // appId -> shuffleId -> shuffle size in buffer
   protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap = 
JavaUtils.newConcurrentMap();
+  private final boolean appBlockSizeMetricEnabled;
 
   public ShuffleBufferManager(
       ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean 
nettyServerEnabled) {
@@ -130,6 +132,8 @@ public class ShuffleBufferManager {
     this.hugePartitionMemoryLimitSize =
         Math.round(
             capacity * 
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
+    appBlockSizeMetricEnabled =
+        
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
   }
 
   public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
@@ -180,6 +184,14 @@ public class ShuffleBufferManager {
     if (!isPreAllocated) {
       updateUsedMemory(size);
     }
+    if (appBlockSizeMetricEnabled) {
+      Arrays.stream(spd.getBlockList())
+          .forEach(
+              b -> {
+                int blockSize = b.getLength();
+                
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
+              });
+    }
     updateShuffleSize(appId, shuffleId, size);
     synchronized (this) {
       flushSingleBufferIfNecessary(
@@ -337,6 +349,9 @@ public class ShuffleBufferManager {
     removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet());
     shuffleSizeMap.remove(appId);
     bufferPool.remove(appId);
+    if (appBlockSizeMetricEnabled) {
+      ShuffleServerMetrics.appHistogramWriteBlockSize.remove(appId);
+    }
   }
 
   public synchronized boolean requireMemory(long size, boolean isPreAllocated) 
{
diff --git 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 08428cc74..56d218980 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server.buffer;
 import java.io.File;
 import java.time.Duration;
 import java.util.Arrays;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import com.google.common.collect.RangeMap;
 import com.google.common.util.concurrent.Uninterruptibles;
+import io.prometheus.client.Collector;
 import org.awaitility.Awaitility;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -38,6 +40,7 @@ import org.apache.uniffle.common.PartitionRange;
 import org.apache.uniffle.common.RemoteStorageInfo;
 import org.apache.uniffle.common.ShuffleDataResult;
 import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.config.ConfigUtils;
 import org.apache.uniffle.common.rpc.StatusCode;
 import org.apache.uniffle.common.util.ByteBufUtils;
 import org.apache.uniffle.common.util.Constants;
@@ -729,4 +732,45 @@ public class ShuffleBufferManagerTest extends 
BufferTestBase {
     assertEquals(0, shuffleBufferManager.getUsedMemory());
     assertEquals(0, shuffleBufferManager.getInFlushSize());
   }
+
+  @Test
+  public void blockSizeMetricsTest() {
+    String appId = "blockSizeMetricsTest";
+    shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
+    ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+    
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
+    
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
+    int shuffleId = 1;
+    shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
+
+    // cache shuffle block data, and record metrics
+    double[] buckets =
+        ConfigUtils.convertBytesStringToDoubleArray(
+            new ShuffleServerConf()
+                
.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS));
+    Arrays.stream(buckets)
+        .sorted()
+        .forEach(
+            bucket -> {
+              StatusCode sc =
+                  shuffleBufferManager.cacheShuffleData(
+                      appId, shuffleId, true, createData(0, (int) bucket));
+              assertEquals(StatusCode.SUCCESS, sc);
+            });
+    // check metrics values
+    List<Collector.MetricFamilySamples> samples =
+        ShuffleServerMetrics.appHistogramWriteBlockSize.collect();
+    assertEquals(samples.size(), 1);
+    int index = 1;
+    Arrays.stream(buckets)
+        .sorted()
+        .forEach(
+            bucket -> {
+              for (Collector.MetricFamilySamples.Sample s : 
samples.get(0).samples) {
+                if (s.labelValues.contains(bucket)) {
+                  assertEquals(s.value, index);
+                }
+              }
+            });
+  }
 }

Reply via email to