This is an automated email from the ASF dual-hosted git repository.

zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bd5fd856 [#1189] feat(server): Add netty used direct memory size 
metric (#1363)
9bd5fd856 is described below

commit 9bd5fd85694dd976948d8809078caa4609c4cf2f
Author: Qing <[email protected]>
AuthorDate: Sat Dec 16 16:19:13 2023 +0800

    [#1189] feat(server): Add netty used direct memory size metric (#1363)
    
    ### What changes were proposed in this pull request?
    
    ShuffleServer add Direct Memory Metric for monitor
    
    ### Why are the changes needed?
    
    Fix: # (https://github.com/apache/incubator-uniffle/issues/1189)
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
---
 .../uniffle/server/NettyDirectMemoryTracker.java   | 73 ++++++++++++++++++++++
 .../org/apache/uniffle/server/ShuffleServer.java   |  6 ++
 .../apache/uniffle/server/ShuffleServerConf.java   | 12 ++++
 .../uniffle/server/ShuffleServerMetrics.java       |  3 +
 4 files changed, 94 insertions(+)

diff --git 
a/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java 
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
new file mode 100644
index 000000000..7506aed17
--- /dev/null
+++ 
b/server/src/main/java/org/apache/uniffle/server/NettyDirectMemoryTracker.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.server;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.util.ThreadUtils;
+
+public class NettyDirectMemoryTracker {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(NettyDirectMemoryTracker.class);
+
+  private final long reportInitialDelay;
+  private final long reportInterval;
+  private final ScheduledExecutorService service =
+      Executors.newSingleThreadScheduledExecutor(
+          ThreadUtils.getThreadFactory("NettyDirectMemoryTracker"));
+
+  public NettyDirectMemoryTracker(ShuffleServerConf conf) {
+    this.reportInitialDelay =
+        
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY);
+    this.reportInterval =
+        
conf.getLong(ShuffleServerConf.SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL);
+  }
+
+  public void start() {
+    LOG.info(
+        "Start report direct memory usage to MetricSystem after {}ms and 
interval is {}ms",
+        reportInitialDelay,
+        reportInterval);
+
+    service.scheduleAtFixedRate(
+        () -> {
+          try {
+            long usedDirectMemory = PlatformDependent.usedDirectMemory();
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Current direct memory usage: {}", usedDirectMemory);
+            }
+            
ShuffleServerMetrics.gaugeUsedDirectMemorySize.set(usedDirectMemory);
+          } catch (Throwable t) {
+            LOG.error("Failed to report direct memory.", t);
+          }
+        },
+        reportInitialDelay,
+        reportInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public void stop() {
+    service.shutdownNow();
+  }
+}
diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
index a5e230eb3..045b6f93f 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServer.java
@@ -75,6 +75,7 @@ public class ShuffleServer {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ShuffleServer.class);
   private RegisterHeartBeat registerHeartBeat;
+  private NettyDirectMemoryTracker directMemoryUsageReporter;
   private String id;
   private String ip;
   private int grpcPort;
@@ -168,6 +169,10 @@ public class ShuffleServer {
       registerHeartBeat.shutdown();
       LOG.info("HeartBeat Stopped!");
     }
+    if (directMemoryUsageReporter != null) {
+      directMemoryUsageReporter.stop();
+      LOG.info("Direct memory usage tracker Stopped!");
+    }
     if (storageManager != null) {
       storageManager.stop();
       LOG.info("MultiStorage Stopped!");
@@ -259,6 +264,7 @@ public class ShuffleServer {
     }
 
     registerHeartBeat = new RegisterHeartBeat(this);
+    directMemoryUsageReporter = new 
NettyDirectMemoryTracker(shuffleServerConf);
     shuffleFlushManager = new ShuffleFlushManager(shuffleServerConf, this, 
storageManager);
     shuffleBufferManager = new ShuffleBufferManager(shuffleServerConf, 
shuffleFlushManager);
     shuffleTaskManager =
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 802455a42..0af62d420 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -77,6 +77,18 @@ public class ShuffleServerConf extends RssBaseConf {
           .defaultValue(10 * 1000L)
           .withDescription("Heartbeat interval to Coordinator (ms)");
 
+  public static final ConfigOption<Long> 
SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_DELAY =
+      
ConfigOptions.key("rss.server.netty.directMemoryTracker.memoryUsage.initialFetchDelayMs")
+          .longType()
+          .defaultValue(10 * 1000L)
+          .withDescription("Direct memory usage tracker initial delay (ms)");
+
+  public static final ConfigOption<Long> 
SERVER_NETTY_DIRECT_MEMORY_USAGE_TRACKER_INTERVAL =
+      
ConfigOptions.key("rss.server.netty.directMemoryTracker.memoryUsage.updateMetricsIntervalMs")
+          .longType()
+          .defaultValue(10 * 1000L)
+          .withDescription("Direct memory usage tracker interval to 
MetricSystem (ms)");
+
   public static final ConfigOption<Integer> 
SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE =
       ConfigOptions.key("rss.server.flush.localfile.threadPool.size")
           .intType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
index 8f6f177b1..7ab54e9c7 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerMetrics.java
@@ -72,6 +72,7 @@ public class ShuffleServerMetrics {
   private static final String IN_FLUSH_BUFFER_SIZE = "in_flush_buffer_size";
   private static final String USED_BUFFER_SIZE = "used_buffer_size";
   private static final String READ_USED_BUFFER_SIZE = "read_used_buffer_size";
+  private static final String USED_DIRECT_MEMORY_SIZE = 
"used_direct_memory_size";
   private static final String TOTAL_FAILED_WRITTEN_EVENT_NUM = 
"total_failed_written_event_num";
   private static final String TOTAL_DROPPED_EVENT_NUM = 
"total_dropped_event_num";
   private static final String TOTAL_HADOOP_WRITE_DATA = 
"total_hadoop_write_data";
@@ -163,6 +164,7 @@ public class ShuffleServerMetrics {
   public static Gauge.Child gaugeInFlushBufferSize;
   public static Gauge.Child gaugeUsedBufferSize;
   public static Gauge.Child gaugeReadBufferUsedSize;
+  public static Gauge.Child gaugeUsedDirectMemorySize;
   public static Gauge.Child gaugeWriteHandler;
   public static Gauge.Child gaugeEventQueueSize;
   public static Gauge.Child gaugeAppNum;
@@ -328,6 +330,7 @@ public class ShuffleServerMetrics {
     gaugeInFlushBufferSize = 
metricsManager.addLabeledGauge(IN_FLUSH_BUFFER_SIZE);
     gaugeUsedBufferSize = metricsManager.addLabeledGauge(USED_BUFFER_SIZE);
     gaugeReadBufferUsedSize = 
metricsManager.addLabeledGauge(READ_USED_BUFFER_SIZE);
+    gaugeUsedDirectMemorySize = 
metricsManager.addLabeledGauge(USED_DIRECT_MEMORY_SIZE);
     gaugeWriteHandler = metricsManager.addLabeledGauge(TOTAL_WRITE_HANDLER);
     gaugeEventQueueSize = metricsManager.addLabeledGauge(EVENT_QUEUE_SIZE);
     gaugeAppNum = metricsManager.addLabeledGauge(APP_NUM_WITH_NODE);

Reply via email to