This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 9bd5fd856 [#1189] feat(server): Add netty used direct memory size
metric (#1363)
9bd5fd856 is described below
commit 9bd5fd85694dd976948d8809078caa4609c4cf2f
Author: Qing <[email protected]>
AuthorDate: Sat Dec 16 16:19:13 2023 +0800
[#1189] feat(server): Add netty used direct memory size metric (#1363)
### What changes were proposed in this pull request?
ShuffleServer add Direct Memory Metric for monitor
### Why are the changes needed?
Fix: # (https://github.com/apache/incubator-uniffle/issues/1189)
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
---
.../uniffle/server/NettyDirectMemoryTracker.java | 73 ++++++++++++++++++++++
.../org/apache/uniffle/server/ShuffleServer.java | 6 ++
.../apache/uniffle/server/ShuffleServerConf.java | 12 ++++
.../uniffle/server/ShuffleServerMetrics.java | 3 +
4 files changed, 94 insertions(+)
diff --git
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
new file mode 100644
index 000000000..7506aed17
--- /dev/null
+++
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class NettyDirectMemoryTracker {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(NettyDirectMemoryTracker.class);
+
+ private final long reportInitialDelay;
+ private final long reportInterval;
+ private final ScheduledExecutorService service =
+ Executors.newSingleThreadScheduledExecutor(
+ ThreadUtils.getThreadFactory("NettyDirectMemoryTracker"));
+
+ public NettyDirectMemoryTracker(ShuffleServerConf conf) {
+ this.reportInitialDelay =
+
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY);
+ this.reportInterval =
+
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL);
+ }
+
+ public void start() {
+ LOG.info(
+ "Start report direct memory usage to MetricSystem after {}ms and
interval is {}ms",
+ reportInitialDelay,
+ reportInterval);
+
+ service.scheduleAtFixedRate(
+ () -> {
+ try {
+ long usedDirectMemory = PlatformDependent.usedDirectMemory();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Current direct memory usage: {}", usedDirectMemory);
+ }
+
ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(usedDirectMemory);
+ } catch (Throwable t) {
+ LOG.error("Failed to report direct memory.", t);
+ }
+ },
+ reportInitialDelay,
+ reportInterval,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public void stop() {
+ service.shutdownNow();
+ }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index a5e230eb3..045b6f93f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -75,6 +75,7 @@ public class ShuffleServer {
private static final Logger LOG =
LoggerFactory.getLogger(ShuffleServer.class);
private RegisterHeartBeat registerHeartBeat;
+ private NettyDirectMemoryTracker directMemoryUsageReporter;
private String id;
private String ip;
private int grpcPort;
@@ -168,6 +169,10 @@ public class ShuffleServer {
registerHeartBeat.shutdown();
LOG.info("HeartBeat Stopped!");
}
+ if (directMemoryUsageReporter != null) {
+ directMemoryUsageReporter.stop();
+ LOG.info("Direct memory usage tracker Stopped!");
+ }
if (storageManager != null) {
storageManager.stop();
LOG.info("MultiStorage Stopped!");
@@ -259,6 +264,7 @@ public class ShuffleServer {
}
registerHeartBeat = new RegisterHeartBeat(this);
+ directMemoryUsageReporter = new
NettyDirectMemoryTracker(shuffleServerConf);
shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this,
storageManager);
shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf,
shuffleFlushManager);
shuffleTaskManager =
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 802455a42..0af62d420 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -77,6 +77,18 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10 * 1000L)
.withDescription("Heartbeat interval to Coordinator (ms)");
+ public static final ConfigOption<Long>
SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY =
+
ConfigOptions.key("rss.server.netty.directMemoryTracker.memoryUsage.initialFetchDelayMs")
+ .longType()
+ .defaultValue(10 * 1000L)
+ .withDescription("Direct memory usage tracker initial delay (ms)");
+
+ public static final ConfigOption<Long>
SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL =
+
ConfigOptions.key("rss.server.netty.directMemoryTracker.memoryUsage.updateMetricsIntervalMs")
+ .longType()
+ .defaultValue(10 * 1000L)
+ .withDescription("Direct memory usage tracker interval to
MetricSystem (ms)");
+
public static final ConfigOption<Integer>
SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE =
ConfigOptions.key("rss.server.flush.localfile.threadPool.size")
.intType()
diff --git
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 8f6f177b1..7ab54e9c7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -72,6 +72,7 @@ public class ShuffleServerMetrics {
private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
private static final String USED_BUFFER_SIZE = "used_buffer_size";
private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
+ private static final String USED_DIRECT_MEMORY_SIZE =
"used_direct_memory_size";
private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM =
"total_failed_written_event_num";
private static final String TOTAL_DROPPED_EVENT_NUM =
"total_dropped_event_num";
private static final String TOTAL_HADOOP_WRITE_DATA =
"total_hadoop_write_data";
@@ -163,6 +164,7 @@ public class ShuffleServerMetrics {
public static Gauge.Child gaugeInFlushBufferSize;
public static Gauge.Child gaugeUsedBufferSize;
public static Gauge.Child gaugeReadBufferUsedSize;
+ public static Gauge.Child gaugeUsedDirectMemorySize;
public static Gauge.Child gaugeWriteHandler;
public static Gauge.Child gaugeEventQueueSize;
public static Gauge.Child gaugeAppNum;
@@ -328,6 +330,7 @@ public class ShuffleServerMetrics {
gaugeInFlushBufferSize =
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
gaugeReadBufferUsedSize =
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
+ gaugeUsedDirectMemorySize =
metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);