This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 89d2f1c8d [#1585] feat(server): Support app-level block size
statistics to report metrics (#1593)
89d2f1c8d is described below
commit 89d2f1c8d08df264afd6eb8f7c956eecc8182e5a
Author: leslizhang <[email protected]>
AuthorDate: Tue Apr 30 19:30:56 2024 +0800
[#1585] feat(server): Support app-level block size statistics to report
metrics (#1593)
### What changes were proposed in this pull request?
added shuffle block size metric of type histogram.
### Why are the changes needed?
related feature https://github.com/apache/incubator-uniffle/issues/1585
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
new UTs.
Co-authored-by: leslizhang <[email protected]>
Co-authored-by: Enrico Minack <[email protected]>
---
.../spark/shuffle/writer/WriteBufferManager.java | 9 +++++
.../apache/uniffle/common/config/ConfigUtils.java | 11 ++++++
.../uniffle/common/metrics/MetricsManager.java | 4 ++
.../org/apache/uniffle/common/util/Constants.java | 1 +
.../org/apache/uniffle/server/ShuffleServer.java | 2 +-
.../apache/uniffle/server/ShuffleServerConf.java | 14 +++++++
.../uniffle/server/ShuffleServerMetrics.java | 28 ++++++++++++--
.../server/buffer/ShuffleBufferManager.java | 15 ++++++++
.../server/buffer/ShuffleBufferManagerTest.java | 44 ++++++++++++++++++++++
9 files changed, 123 insertions(+), 5 deletions(-)
diff --git
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
index efe376a34..f5fa497bb 100644
---
a/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
+++
b/client-spark/common/src/main/java/org/apache/spark/shuffle/writer/WriteBufferManager.java
@@ -179,7 +179,14 @@ public class WriteBufferManager extends MemoryConsumer {
// check buffer size > spill threshold
if (usedBytes.get() - inSendListBytes.get() > spillSize) {
+ LOG.info(
+ "ShuffleBufferManager spill for buffer size exceeding spill
threshold,"
+ + "usedBytes[{}],inSendListBytes[{}],spillSize[{}]",
+ usedBytes.get(),
+ inSendListBytes.get(),
+ spillSize);
List<ShuffleBlockInfo> multiSendingBlocks = clear();
+
multiSendingBlocks.addAll(singleOrEmptySendingBlocks);
writeTime += System.currentTimeMillis() - start;
return multiSendingBlocks;
@@ -316,6 +323,8 @@ public class WriteBufferManager extends MemoryConsumer {
+ dataSize
+ "], memoryUsed["
+ memoryUsed
+ + "], number of blocks["
+ + result.size()
+ "]");
return result;
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
index 14777e8e0..64f0ee1d5 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/ConfigUtils.java
@@ -181,6 +181,17 @@ public class ConfigUtils {
return Double.parseDouble(o.toString());
}
+ public static double[] convertBytesStringToDoubleArray(Object o) {
+ if (o == null) {
+ return new double[0];
+ } else {
+ return Arrays.stream(o.toString().split(","))
+ .map(UnitConverter::byteStringAsBytes)
+ .mapToDouble(l -> (double) l)
+ .toArray();
+ }
+ }
+
@SuppressWarnings("unchecked")
public static List<ConfigOption<Object>> getAllConfigOptions(
Class<? extends RssBaseConf> confClass) {
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index 10b7542ac..e2ae3106c 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -83,6 +83,10 @@ public class MetricsManager {
return addHistogram(name, "Histogram " + name, buckets, labels);
}
+ public Histogram addHistogram(String name, double[] buckets) {
+ return addHistogram(name, "Histogram " + name, buckets, defaultLabelNames);
+ }
+
public Histogram addHistogram(String name, String help, double[] buckets,
String[] labels) {
return Histogram.build()
.name(name)
diff --git a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
index 1f4dd0b06..ccaa51970 100644
--- a/common/src/main/java/org/apache/uniffle/common/util/Constants.java
+++ b/common/src/main/java/org/apache/uniffle/common/util/Constants.java
@@ -24,6 +24,7 @@ public final class Constants {
// the value is used for client/server compatible, eg, online upgrade
public static final String SHUFFLE_SERVER_VERSION = "ss_v5";
public static final String METRICS_TAG_LABEL_NAME = "tags";
+ public static final String METRICS_APP_LABEL_NAME = "appId";
public static final String COORDINATOR_TAG = "coordinator";
public static final String SHUFFLE_DATA_FILE_SUFFIX = ".data";
public static final String SHUFFLE_INDEX_FILE_SUFFIX = ".index";
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index bab2bcf44..79fe35b76 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -316,7 +316,7 @@ public class ShuffleServer {
LOG.info("Register metrics");
CollectorRegistry shuffleServerCollectorRegistry = new
CollectorRegistry(true);
String rawTags = getEncodedTags();
- ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags);
+ ShuffleServerMetrics.register(shuffleServerCollectorRegistry, rawTags,
shuffleServerConf);
grpcMetrics = new ShuffleServerGrpcMetrics(this.shuffleServerConf,
rawTags);
grpcMetrics.register(new CollectorRegistry(true));
nettyMetrics = new ShuffleServerNettyMetrics(shuffleServerConf, rawTags);
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index a7d8254f4..088252779 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -572,6 +572,20 @@ public class ShuffleServerConf extends RssBaseConf {
.withDescription(
"keep alive time of thread pool that used for calc summary
metric, in SECONDS.");
+ public static final ConfigOption<Boolean>
APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED =
+ ConfigOptions.key("rss.server.metrics.blockSizeStatisticsEnabled")
+ .booleanType()
+ .defaultValue(false)
+ .withDescription("whether or not shuffle block size metric is
enabled");
+
+ public static final ConfigOption<String>
APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS =
+ ConfigOptions.key("rss.server.metrics.blockSizeStatistics.buckets")
+ .stringType()
+ .defaultValue("32kb,64kb,128kb,256kb,512kb,1mb,2mb,4mb,8mb,16mb")
+ .withDescription(
+ "A comma-separated block size list, where each value"
+ + " can be suffixed with a memory size unit, such as kb or
k, mb or m, etc.");
+
public ShuffleServerConf() {}
public ShuffleServerConf(String fileName) {
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index b4566b366..bee4dd48c 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -24,18 +24,23 @@ import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
+import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;
import org.apache.commons.lang3.StringUtils;
+import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.metrics.MetricsManager;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.common.LocalStorage;
+import static org.apache.uniffle.common.util.Constants.METRICS_APP_LABEL_NAME;
+
public class ShuffleServerMetrics {
private static final String TOTAL_RECEIVED_DATA = "total_received_data";
private static final String TOTAL_WRITE_DATA = "total_write_data";
private static final String TOTAL_WRITE_BLOCK = "total_write_block";
+ private static final String WRITE_BLOCK_SIZE = "write_block_size";
private static final String TOTAL_WRITE_TIME = "total_write_time";
private static final String TOTAL_WRITE_HANDLER = "total_write_handler";
private static final String TOTAL_WRITE_EXCEPTION = "total_write_exception";
@@ -148,6 +153,7 @@ public class ShuffleServerMetrics {
public static Counter.Child counterTotalReceivedDataSize;
public static Counter.Child counterTotalWriteDataSize;
public static Counter.Child counterTotalWriteBlockSize;
+ public static Histogram appHistogramWriteBlockSize;
public static Counter.Child counterTotalWriteTime;
public static Counter.Child counterWriteException;
public static Counter.Child counterWriteSlow;
@@ -231,20 +237,28 @@ public class ShuffleServerMetrics {
private static MetricsManager metricsManager;
private static boolean isRegister = false;
- public static synchronized void register(CollectorRegistry
collectorRegistry, String tags) {
+ public static synchronized void register(
+ CollectorRegistry collectorRegistry, String tags, ShuffleServerConf
serverConf) {
if (!isRegister) {
ShuffleServerMetrics.tags = tags;
Map<String, String> labels = Maps.newHashMap();
labels.put(Constants.METRICS_TAG_LABEL_NAME, ShuffleServerMetrics.tags);
metricsManager = new MetricsManager(collectorRegistry, labels);
isRegister = true;
- setUpMetrics();
+ setUpMetrics(serverConf);
}
}
+ public static void register(ShuffleServerConf serverConf) {
+ register(CollectorRegistry.defaultRegistry,
Constants.SHUFFLE_SERVER_VERSION, serverConf);
+ }
+
@VisibleForTesting
public static void register() {
- register(CollectorRegistry.defaultRegistry,
Constants.SHUFFLE_SERVER_VERSION);
+ register(
+ CollectorRegistry.defaultRegistry,
+ Constants.SHUFFLE_SERVER_VERSION,
+ new ShuffleServerConf());
}
@VisibleForTesting
@@ -314,10 +328,16 @@ public class ShuffleServerMetrics {
incHadoopStorageWriteDataSize(storageHost, size, false);
}
- private static void setUpMetrics() {
+ private static void setUpMetrics(ShuffleServerConf serverConf) {
counterTotalReceivedDataSize =
metricsManager.addLabeledCounter(TOTAL_RECEIVED_DATA);
counterTotalWriteDataSize =
metricsManager.addLabeledCounter(TOTAL_WRITE_DATA);
counterTotalWriteBlockSize =
metricsManager.addLabeledCounter(TOTAL_WRITE_BLOCK);
+ appHistogramWriteBlockSize =
+ metricsManager.addHistogram(
+ WRITE_BLOCK_SIZE,
+ ConfigUtils.convertBytesStringToDoubleArray(
+
serverConf.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS)),
+ METRICS_APP_LABEL_NAME);
counterTotalWriteTime = metricsManager.addLabeledCounter(TOTAL_WRITE_TIME);
counterWriteException =
metricsManager.addLabeledCounter(TOTAL_WRITE_EXCEPTION);
counterWriteSlow = metricsManager.addLabeledCounter(TOTAL_WRITE_SLOW);
diff --git
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
index 578b6c498..8c1e39cf2 100644
---
a/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
+++
b/server/src/main/java/org/apache/uniffle/server/buffer/ShuffleBufferManager.java
@@ -17,6 +17,7 @@
package org.apache.uniffle.server.buffer;
+import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
@@ -81,6 +82,7 @@ public class ShuffleBufferManager {
protected Map<String, Map<Integer, RangeMap<Integer, ShuffleBuffer>>>
bufferPool;
// appId -> shuffleId -> shuffle size in buffer
protected Map<String, Map<Integer, AtomicLong>> shuffleSizeMap =
JavaUtils.newConcurrentMap();
+ private final boolean appBlockSizeMetricEnabled;
public ShuffleBufferManager(
ShuffleServerConf conf, ShuffleFlushManager shuffleFlushManager, boolean
nettyServerEnabled) {
@@ -130,6 +132,8 @@ public class ShuffleBufferManager {
this.hugePartitionMemoryLimitSize =
Math.round(
capacity *
conf.get(ShuffleServerConf.HUGE_PARTITION_MEMORY_USAGE_LIMITATION_RATIO));
+ appBlockSizeMetricEnabled =
+
conf.getBoolean(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_ENABLED);
}
public void setShuffleTaskManager(ShuffleTaskManager taskManager) {
@@ -180,6 +184,14 @@ public class ShuffleBufferManager {
if (!isPreAllocated) {
updateUsedMemory(size);
}
+ if (appBlockSizeMetricEnabled) {
+ Arrays.stream(spd.getBlockList())
+ .forEach(
+ b -> {
+ int blockSize = b.getLength();
+
ShuffleServerMetrics.appHistogramWriteBlockSize.labels(appId).observe(blockSize);
+ });
+ }
updateShuffleSize(appId, shuffleId, size);
synchronized (this) {
flushSingleBufferIfNecessary(
@@ -337,6 +349,9 @@ public class ShuffleBufferManager {
removeBufferByShuffleId(appId, shuffleIdToBuffers.keySet());
shuffleSizeMap.remove(appId);
bufferPool.remove(appId);
+ if (appBlockSizeMetricEnabled) {
+ ShuffleServerMetrics.appHistogramWriteBlockSize.remove(appId);
+ }
}
public synchronized boolean requireMemory(long size, boolean isPreAllocated)
{
diff --git
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
index 08428cc74..56d218980 100644
---
a/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
+++
b/server/src/test/java/org/apache/uniffle/server/buffer/ShuffleBufferManagerTest.java
@@ -20,6 +20,7 @@ package org.apache.uniffle.server.buffer;
import java.io.File;
import java.time.Duration;
import java.util.Arrays;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -28,6 +29,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import com.google.common.collect.RangeMap;
import com.google.common.util.concurrent.Uninterruptibles;
+import io.prometheus.client.Collector;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -38,6 +40,7 @@ import org.apache.uniffle.common.PartitionRange;
import org.apache.uniffle.common.RemoteStorageInfo;
import org.apache.uniffle.common.ShuffleDataResult;
import org.apache.uniffle.common.ShufflePartitionedData;
+import org.apache.uniffle.common.config.ConfigUtils;
import org.apache.uniffle.common.rpc.StatusCode;
import org.apache.uniffle.common.util.ByteBufUtils;
import org.apache.uniffle.common.util.Constants;
@@ -729,4 +732,45 @@ public class ShuffleBufferManagerTest extends
BufferTestBase {
assertEquals(0, shuffleBufferManager.getUsedMemory());
assertEquals(0, shuffleBufferManager.getInFlushSize());
}
+
+ @Test
+ public void blockSizeMetricsTest() {
+ String appId = "blockSizeMetricsTest";
+ shuffleBufferManager.setShuffleTaskManager(mockShuffleTaskManager);
+ ReentrantReadWriteLock rwLock = new ReentrantReadWriteLock();
+
when(mockShuffleTaskManager.getAppReadLock(appId)).thenReturn(rwLock.readLock());
+
when(mockShuffleServer.getShuffleTaskManager()).thenReturn(mockShuffleTaskManager);
+ int shuffleId = 1;
+ shuffleBufferManager.registerBuffer(appId, shuffleId, 0, 1);
+
+ // cache shuffle block data, and record metrics
+ double[] buckets =
+ ConfigUtils.convertBytesStringToDoubleArray(
+ new ShuffleServerConf()
+
.get(ShuffleServerConf.APP_LEVEL_SHUFFLE_BLOCK_SIZE_METRIC_BUCKETS));
+ Arrays.stream(buckets)
+ .sorted()
+ .forEach(
+ bucket -> {
+ StatusCode sc =
+ shuffleBufferManager.cacheShuffleData(
+ appId, shuffleId, true, createData(0, (int) bucket));
+ assertEquals(StatusCode.SUCCESS, sc);
+ });
+ // check metrics values
+ List<Collector.MetricFamilySamples> samples =
+ ShuffleServerMetrics.appHistogramWriteBlockSize.collect();
+ assertEquals(samples.size(), 1);
+ int index = 1;
+ Arrays.stream(buckets)
+ .sorted()
+ .forEach(
+ bucket -> {
+ for (Collector.MetricFamilySamples.Sample s :
samples.get(0).samples) {
+ if (s.labelValues.contains(bucket)) {
+ assertEquals(s.value, index);
+ }
+ }
+ });
+ }
}