This is an automated email from the ASF dual-hosted git repository.
xianjin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 8dfa2a7e8 Refactor metrics system to reduce periodic reporting load
(#1991)
8dfa2a7e8 is described below
commit 8dfa2a7e8b486febce3dbedfe16cc65932163a77
Author: kqhzz <[email protected]>
AuthorDate: Wed Aug 14 20:39:24 2024 +0800
Refactor metrics system to reduce periodic reporting load (#1991)
### What changes were proposed in this pull request?
Add another method to add gauge metric, we can use lambda to describe a
gauge metric.
### Why are the changes needed?
Fix: #1973
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
UTs.
---
.../uniffle/common/metrics/MetricsManager.java | 32 +++++++++
.../uniffle/common/metrics/SupplierGauge.java | 66 +++++++++++++++++
.../uniffle/server/DefaultFlushEventHandler.java | 9 ++-
.../uniffle/server/NettyDirectMemoryTracker.java | 82 ----------------------
.../org/apache/uniffle/server/ShuffleServer.java | 25 +++++--
.../uniffle/server/ShuffleServerMetrics.java | 23 +++---
6 files changed, 129 insertions(+), 108 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
index e2ae3106c..b26c055c7 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/MetricsManager.java
@@ -19,6 +19,7 @@ package org.apache.uniffle.common.metrics;
import java.util.Arrays;
import java.util.Map;
+import java.util.function.Supplier;
import com.google.common.collect.Maps;
import io.prometheus.client.CollectorRegistry;
@@ -27,12 +28,15 @@ import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import io.prometheus.client.Summary;
+import org.apache.uniffle.common.util.JavaUtils;
+
public class MetricsManager {
private final CollectorRegistry collectorRegistry;
private final String[] defaultLabelNames;
private final String[] defaultLabelValues;
private static final double[] QUANTILES = {0.50, 0.75, 0.90, 0.95, 0.99};
private static final double QUANTILE_ERROR = 0.01;
+ private Map<String, SupplierGauge> supplierGaugeMap;
public MetricsManager() {
this(null, Maps.newHashMap());
@@ -47,6 +51,7 @@ public class MetricsManager {
this.defaultLabelNames = defaultLabels.keySet().toArray(new String[0]);
this.defaultLabelValues =
Arrays.stream(defaultLabelNames).map(defaultLabels::get).toArray(String[]::new);
+ this.supplierGaugeMap = JavaUtils.newConcurrentMap();
}
public CollectorRegistry getCollectorRegistry() {
@@ -79,6 +84,19 @@ public class MetricsManager {
return c.labels(this.defaultLabelValues);
}
+ public void addLabeledGauge(String name, Supplier<Double> supplier) {
+ supplierGaugeMap.computeIfAbsent(
+ name,
+ metricName ->
+ new SupplierGauge(
+ name,
+ "Gauge " + name,
+ supplier,
+ this.defaultLabelNames,
+ this.defaultLabelValues)
+ .register(collectorRegistry));
+ }
+
public Histogram addHistogram(String name, double[] buckets, String...
labels) {
return addHistogram(name, "Histogram " + name, buckets, labels);
}
@@ -112,4 +130,18 @@ public class MetricsManager {
}
return builder.register(collectorRegistry).labels(defaultLabelValues);
}
+
+ public void unregisterAllSupplierGauge() {
+ for (SupplierGauge gauge : supplierGaugeMap.values()) {
+ collectorRegistry.unregister(gauge);
+ }
+ supplierGaugeMap.clear();
+ }
+
+ public void unregisterSupplierGauge(String name) {
+ if (supplierGaugeMap.containsKey(name)) {
+ collectorRegistry.unregister(supplierGaugeMap.get(name));
+ supplierGaugeMap.remove(name);
+ }
+ }
}
diff --git
a/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
new file mode 100644
index 000000000..674980def
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/SupplierGauge.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.metrics;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.function.Supplier;
+
+import io.prometheus.client.Collector;
+import io.prometheus.client.GaugeMetricFamily;
+
+class SupplierGauge extends Collector implements Collector.Describable {
+ private String name;
+ private String help;
+ private Supplier<Double> supplier;
+ private List<String> labelNames;
+ private List<String> labelValues;
+
+ SupplierGauge(
+ String name,
+ String help,
+ Supplier<Double> supplier,
+ String[] labelNames,
+ String[] labelValues) {
+ this.name = name;
+ this.help = help;
+ this.supplier = supplier;
+ this.labelNames = Arrays.asList(labelNames);
+ this.labelValues = Arrays.asList(labelValues);
+ }
+
+ @Override
+ public List<MetricFamilySamples> collect() {
+ List<MetricFamilySamples.Sample> samples = new ArrayList<>();
+ samples.add(
+ new MetricFamilySamples.Sample(
+ this.name, this.labelNames, this.labelValues,
this.supplier.get()));
+ MetricFamilySamples mfs = new MetricFamilySamples(this.name, Type.GAUGE,
this.help, samples);
+ List<MetricFamilySamples> mfsList = new ArrayList<MetricFamilySamples>(1);
+ mfsList.add(mfs);
+ return mfsList;
+ }
+
+ @Override
+ public List<MetricFamilySamples> describe() {
+ return Collections.<MetricFamilySamples>singletonList(
+ new GaugeMetricFamily(this.name, this.help, this.labelNames));
+ }
+}
diff --git
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
index f32a96101..e0a47526f 100644
---
a/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
+++
b/server/src/main/java/org/apache/uniffle/server/DefaultFlushEventHandler.java
@@ -41,6 +41,8 @@ import org.apache.uniffle.storage.common.LocalStorage;
import org.apache.uniffle.storage.common.Storage;
import org.apache.uniffle.storage.util.StorageType;
+import static org.apache.uniffle.server.ShuffleServerMetrics.EVENT_QUEUE_SIZE;
+
public class DefaultFlushEventHandler implements FlushEventHandler {
private static final Logger LOG =
LoggerFactory.getLogger(DefaultFlushEventHandler.class);
@@ -77,8 +79,6 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
// We need to release the memory when discarding the event
event.doCleanup();
ShuffleServerMetrics.counterTotalDroppedEventNum.inc();
- } else {
- ShuffleServerMetrics.gaugeEventQueueSize.inc();
}
}
@@ -160,8 +160,6 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
} else {
ShuffleServerMetrics.gaugeFallbackFlushThreadPoolQueueSize.dec();
}
-
- ShuffleServerMetrics.gaugeEventQueueSize.dec();
}
}
@@ -178,6 +176,7 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
hadoopThreadPoolExecutor = createFlushEventExecutor(poolSize,
"HadoopFlushEventThreadPool");
}
fallbackThreadPoolExecutor = createFlushEventExecutor(5,
"FallBackFlushEventThreadPool");
+ ShuffleServerMetrics.addLabeledGauge(EVENT_QUEUE_SIZE, () -> (double)
flushQueue.size());
startEventProcessor();
}
@@ -248,7 +247,7 @@ public class DefaultFlushEventHandler implements
FlushEventHandler {
@Override
public int getEventNumInFlush() {
- return (int) ShuffleServerMetrics.gaugeEventQueueSize.get();
+ return flushQueue.size();
}
@Override
diff --git
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
deleted file mode 100644
index e9eb17060..000000000
---
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.uniffle.server;
-
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.util.internal.PlatformDependent;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.uniffle.common.util.ThreadUtils;
-
-public class NettyDirectMemoryTracker {
-
- private static final Logger LOG =
LoggerFactory.getLogger(NettyDirectMemoryTracker.class);
-
- private final long reportInitialDelay;
- private final long reportInterval;
- private final ScheduledExecutorService service =
- Executors.newSingleThreadScheduledExecutor(
- ThreadUtils.getThreadFactory("NettyDirectMemoryTracker"));
-
- public NettyDirectMemoryTracker(ShuffleServerConf conf) {
- this.reportInitialDelay =
-
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY);
- this.reportInterval =
-
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL);
- }
-
- public void start() {
- LOG.info(
- "Start report direct memory usage to MetricSystem after {}ms and
interval is {}ms",
- reportInitialDelay,
- reportInterval);
-
- service.scheduleAtFixedRate(
- () -> {
- try {
- long usedDirectMemoryByNetty =
PlatformDependent.usedDirectMemory();
- long usedDirectMemoryByGrpcNetty =
-
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory();
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Current usedDirectMemoryByNetty:{},
usedDirectMemoryByGrpcNetty:{}",
- usedDirectMemoryByNetty,
- usedDirectMemoryByGrpcNetty);
- }
-
ShuffleServerMetrics.gaugeUsedDirectMemorySizeByNetty.set(usedDirectMemoryByNetty);
- ShuffleServerMetrics.gaugeUsedDirectMemorySizeByGrpcNetty.set(
- usedDirectMemoryByGrpcNetty);
- ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(
- usedDirectMemoryByNetty + usedDirectMemoryByGrpcNetty);
- } catch (Throwable t) {
- LOG.error("Failed to report direct memory.", t);
- }
- },
- reportInitialDelay,
- reportInterval,
- TimeUnit.MILLISECONDS);
- }
-
- public void stop() {
- service.shutdownNow();
- }
-}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index 60be04b65..4b59cb5ce 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
+import io.netty.util.internal.PlatformDependent;
import io.prometheus.client.CollectorRegistry;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
@@ -73,13 +74,15 @@ import static
org.apache.uniffle.common.config.RssBaseConf.RSS_STORAGE_TYPE;
import static
org.apache.uniffle.common.config.RssBaseConf.RSS_TEST_MODE_ENABLE;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_CHECK_INTERVAL;
import static
org.apache.uniffle.server.ShuffleServerConf.SERVER_DECOMMISSION_SHUTDOWN;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY;
+import static
org.apache.uniffle.server.ShuffleServerMetrics.USED_DIRECT_MEMORY_SIZE_BY_NETTY;
/** Server that manages startup/shutdown of a {@code Greeter} server. */
public class ShuffleServer {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleServer.class);
private RegisterHeartBeat registerHeartBeat;
- private NettyDirectMemoryTracker directMemoryUsageReporter;
private String id;
private String ip;
private int grpcPort;
@@ -156,7 +159,6 @@ public class ShuffleServer {
initMetricsReporter();
registerHeartBeat.startHeartBeat();
- directMemoryUsageReporter.start();
Runtime.getRuntime()
.addShutdownHook(
new Thread() {
@@ -184,10 +186,6 @@ public class ShuffleServer {
registerHeartBeat.shutdown();
LOG.info("HeartBeat Stopped!");
}
- if (directMemoryUsageReporter != null) {
- directMemoryUsageReporter.stop();
- LOG.info("Direct memory usage tracker Stopped!");
- }
if (storageManager != null) {
storageManager.stop();
LOG.info("MultiStorage Stopped!");
@@ -304,7 +302,6 @@ public class ShuffleServer {
}
registerHeartBeat = new RegisterHeartBeat(this);
- directMemoryUsageReporter = new
NettyDirectMemoryTracker(shuffleServerConf);
shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this,
storageManager);
shuffleBufferManager =
new ShuffleBufferManager(shuffleServerConf, shuffleFlushManager,
nettyServerEnabled);
@@ -320,6 +317,20 @@ public class ShuffleServer {
storageManager,
shuffleMergeManager);
shuffleTaskManager.start();
+ ShuffleServerMetrics.addLabeledGauge(
+ USED_DIRECT_MEMORY_SIZE_BY_NETTY, () -> (double)
PlatformDependent.usedDirectMemory());
+ ShuffleServerMetrics.addLabeledGauge(
+ USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY,
+ () ->
+ (double)
+
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory());
+ ShuffleServerMetrics.addLabeledGauge(
+ USED_DIRECT_MEMORY_SIZE,
+ () ->
+ (double)
+ (PlatformDependent.usedDirectMemory()
+ +
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent
+ .usedDirectMemory()));
setServer();
}
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 3e886407c..b3e56c0b3 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -18,6 +18,7 @@
package org.apache.uniffle.server;
import java.util.Map;
+import java.util.function.Supplier;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
@@ -57,7 +58,7 @@ public class ShuffleServerMetrics {
private static final String EVENT_SIZE_THRESHOLD_LEVEL2 =
"event_size_threshold_level2";
private static final String EVENT_SIZE_THRESHOLD_LEVEL3 =
"event_size_threshold_level3";
private static final String EVENT_SIZE_THRESHOLD_LEVEL4 =
"event_size_threshold_level4";
- private static final String EVENT_QUEUE_SIZE = "event_queue_size";
+ public static final String EVENT_QUEUE_SIZE = "event_queue_size";
private static final String MERGE_EVENT_QUEUE_SIZE =
"merge_event_queue_size";
private static final String HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE =
"hadoop_flush_thread_pool_queue_size";
@@ -98,9 +99,9 @@ public class ShuffleServerMetrics {
private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
private static final String USED_BUFFER_SIZE = "used_buffer_size";
private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
- private static final String USED_DIRECT_MEMORY_SIZE =
"used_direct_memory_size";
- private static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY =
"used_direct_memory_size_by_netty";
- private static final String USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY =
+ public static final String USED_DIRECT_MEMORY_SIZE =
"used_direct_memory_size";
+ public static final String USED_DIRECT_MEMORY_SIZE_BY_NETTY =
"used_direct_memory_size_by_netty";
+ public static final String USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY =
"used_direct_memory_size_by_grpc_netty";
private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM =
"total_failed_written_event_num";
private static final String TOTAL_DROPPED_EVENT_NUM =
"total_dropped_event_num";
@@ -218,11 +219,7 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeInFlushBufferSize;
public static Gauge.Child gaugeUsedBufferSize;
public static Gauge.Child gaugeReadBufferUsedSize;
- public static Gauge.Child gaugeUsedDirectMemorySize;
- public static Gauge.Child gaugeUsedDirectMemorySizeByNetty;
- public static Gauge.Child gaugeUsedDirectMemorySizeByGrpcNetty;
public static Gauge.Child gaugeWriteHandler;
- public static Gauge.Child gaugeEventQueueSize;
public static Gauge.Child gaugeMergeEventQueueSize;
public static Gauge.Child gaugeHadoopFlushThreadPoolQueueSize;
public static Gauge.Child gaugeLocalfileFlushThreadPoolQueueSize;
@@ -449,13 +446,7 @@ public class ShuffleServerMetrics {
gaugeInFlushBufferSize =
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
gaugeReadBufferUsedSize =
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
- gaugeUsedDirectMemorySize =
metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
- gaugeUsedDirectMemorySizeByNetty =
- metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_NETTY);
- gaugeUsedDirectMemorySizeByGrpcNetty =
- metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE_BY_GRPC_NETTY);
gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
- gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
gaugeMergeEventQueueSize =
metricsManager.addLabeledGauge(MERGE_EVENT_QUEUE_SIZE);
gaugeHadoopFlushThreadPoolQueueSize =
metricsManager.addLabeledGauge(HADOOP_FLUSH_THREAD_POOL_QUEUE_SIZE);
@@ -521,4 +512,8 @@ public class ShuffleServerMetrics {
.labelNames("app_id")
.register(metricsManager.getCollectorRegistry());
}
+
+ public static void addLabeledGauge(String name, Supplier<Double> supplier) {
+ metricsManager.addLabeledGauge(name, supplier);
+ }
}