This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new 7b631773e [#1686] feat(netty): Support pending tasks number metrics 
for Netty EventLoopGroup (#1687)
7b631773e is described below

commit 7b631773e822773634c0c536f9aedc9676f4a3ea
Author: RickyMa <[email protected]>
AuthorDate: Sat May 11 11:14:18 2024 +0800

    [#1686] feat(netty): Support pending tasks number metrics for Netty 
EventLoopGroup (#1687)
    
    ### What changes were proposed in this pull request?
    
    Support pending tasks number metrics for Netty EventLoopGroup.
    
    ### Why are the changes needed?
    
    For https://github.com/apache/incubator-uniffle/issues/1686.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Existing tests.
---
 .../uniffle/common/metrics/NettyMetrics.java       | 18 +++++
 docs/server_guide.md                               | 77 +++++++++++-----------
 .../apache/uniffle/server/ShuffleServerConf.java   |  7 ++
 .../apache/uniffle/server/netty/StreamServer.java  | 45 +++++++++++++
 .../uniffle/server/ShuffleServerMetricsTest.java   |  2 +-
 5 files changed, 110 insertions(+), 39 deletions(-)

diff --git 
a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java 
b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
index 0879e2075..b41f57690 100644
--- a/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
+++ b/common/src/main/java/org/apache/uniffle/common/metrics/NettyMetrics.java
@@ -26,9 +26,15 @@ public abstract class NettyMetrics extends RPCMetrics {
 
   private static final String NETTY_ACTIVE_CONNECTION = 
"netty_active_connection";
   private static final String NETTY_HANDLE_EXCEPTION = 
"netty_handle_exception";
+  private static final String NETTY_PENDING_TASKS_NUM_FOR_BOSS_GROUP =
+      "netty_pending_tasks_num_for_boss_group";
+  private static final String NETTY_PENDING_TASKS_NUM_FOR_WORKER_GROUP =
+      "netty_pending_tasks_num_for_worker_group";
 
   protected Gauge.Child gaugeNettyActiveConn;
   protected Counter.Child counterNettyException;
+  protected Gauge.Child gaugeNettyPendingTasksNumForBossGroup;
+  protected Gauge.Child gaugeNettyPendingTasksNumForWorkerGroup;
 
   public NettyMetrics(RssConf rssConf, String tags) {
     super(rssConf, tags);
@@ -38,6 +44,10 @@ public abstract class NettyMetrics extends RPCMetrics {
   public void registerGeneralMetrics() {
     gaugeNettyActiveConn = 
metricsManager.addLabeledGauge(NETTY_ACTIVE_CONNECTION);
     counterNettyException = 
metricsManager.addLabeledCounter(NETTY_HANDLE_EXCEPTION);
+    gaugeNettyPendingTasksNumForBossGroup =
+        metricsManager.addLabeledGauge(NETTY_PENDING_TASKS_NUM_FOR_BOSS_GROUP);
+    gaugeNettyPendingTasksNumForWorkerGroup =
+        
metricsManager.addLabeledGauge(NETTY_PENDING_TASKS_NUM_FOR_WORKER_GROUP);
   }
 
   public Counter.Child getCounterNettyException() {
@@ -47,4 +57,12 @@ public abstract class NettyMetrics extends RPCMetrics {
   public Gauge.Child getGaugeNettyActiveConn() {
     return gaugeNettyActiveConn;
   }
+
+  public Gauge.Child getGaugeNettyPendingTasksNumForBossGroup() {
+    return gaugeNettyPendingTasksNumForBossGroup;
+  }
+
+  public Gauge.Child getGaugeNettyPendingTasksNumForWorkerGroup() {
+    return gaugeNettyPendingTasksNumForWorkerGroup;
+  }
 }
diff --git a/docs/server_guide.md b/docs/server_guide.md
index 8799fc4a2..ec2d24fcc 100644
--- a/docs/server_guide.md
+++ b/docs/server_guide.md
@@ -69,44 +69,45 @@ This document will introduce how to deploy Uniffle shuffle 
servers.
    ```
 
 ## Configuration
-| Property Name                                           | Default            
                                                    | Description               
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-|---------------------------------------------------------|------------------------------------------------------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| rss.coordinator.quorum                                  | -                  
                                                    | Coordinator quorum        
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
-| rss.rpc.server.type                                     | GRPC               
                                                    | Shuffle server type, 
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the server side for better stability and 
performance.                                                                    
                                                                                
                               [...]
-| rss.rpc.server.port                                     | 19999              
                                                    | RPC port for Shuffle 
server, if set zero, grpc server start on random port.                          
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.jetty.http.port                                     | 19998              
                                                    | Http port for Shuffle 
server                                                                          
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.netty.port                                   | -1                 
                                                    | Netty port for Shuffle 
server, if set zero, Netty server start on random port.                         
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.netty.epoll.enable                           | false              
                                                    | Whether to enable epoll 
model with Netty server.                                                        
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.netty.accept.thread                          | 10                 
                                                    | Accept thread count in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.netty.worker.thread                          | 0                  
                                                    | Worker thread count in 
netty. When set to 0, the default value is dynamically set to twice the number 
of processor cores, but it will not be less than 100 to ensure the minimum 
throughput of the service.                                                      
                                                                                
                       [...]
-| rss.server.netty.connect.backlog                        | 0                  
                                                    | For Netty server, 
requested maximum length of the queue of incoming connections.                  
                                                                                
                                                                                
                                                                                
                      [...]
-| rss.server.netty.connect.timeout                        | 5000               
                                                    | Timeout for connection in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
              [...]
-| rss.server.netty.receive.buf                            | 0                  
                                                    | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system 
automatically estimates the receive buffer size based on default settings.      
                               [...]
-| rss.server.netty.send.buf                               | 0                  
                                                    | Send buffer size 
(SO_SNDBUF).                                                                    
                                                                                
                                                                                
                                                                                
                       [...]
-| rss.server.buffer.capacity                              | -1                 
                                                    | Max memory of buffer 
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used   
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.server.buffer.capacity.ratio                        | 0.6                
                                                    | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or 
off-heap size(when enabling Netty) * ratio                                      
                                                                                
                                                                                
                                     [...]
-| rss.server.memory.shuffle.highWaterMark.percentage      | 75.0               
                                                    | Threshold of spill data 
to storage, percentage of rss.server.buffer.capacity                            
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.memory.shuffle.lowWaterMark.percentage       | 25.0               
                                                    | Threshold of keep data in 
memory, percentage of rss.server.buffer.capacity                                
                                                                                
                                                                                
                                                                                
              [...]
-| rss.server.read.buffer.capacity                         | -1                 
                                                    | Max size of buffer for 
reading data. If negative, JVM heap size * read.buffer.ratio is used            
                                                                                
                                                                                
                                                                                
                 [...]
-| rss.server.read.buffer.capacity.ratio                   | 0.2                
                                                    | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size or off-heap size(when enabling Netty) * ratio                              
                                                                                
                                                                                
                                       [...]
-| rss.server.heartbeat.interval                           | 10000              
                                                    | Heartbeat interval to 
Coordinator (ms)                                                                
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.flush.localfile.threadPool.size              | 10                 
                                                    | Thread pool for flush 
data to local file                                                              
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.flush.hadoop.threadPool.size                 | 60                 
                                                    | Thread pool for flush 
data to hadoop storage                                                          
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.commit.timeout                               | 600000             
                                                    | Timeout when commit 
shuffle data (ms)                                                               
                                                                                
                                                                                
                                                                                
                    [...]
-| rss.storage.type                                        | -                  
                                                    | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                               [...]
-| rss.server.flush.cold.storage.threshold.size            | 64M                
                                                    | The threshold of data 
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used                 
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.tags                                         | -                  
                                                    | The comma-separated list 
of tags to indicate the shuffle server's attributes. It will be used as the 
assignment basis for the coordinator                                            
                                                                                
                                                                                
                   [...]
-| rss.server.single.buffer.flush.enabled                  | true               
                                                    | Whether single buffer 
flush when size exceeded rss.server.single.buffer.flush.threshold               
                                                                                
                                                                                
                                                                                
                  [...]
-| rss.server.single.buffer.flush.threshold                | 128M               
                                                    | The threshold of single 
shuffle buffer flush                                                            
                                                                                
                                                                                
                                                                                
                [...]
-| rss.server.disk.capacity                                | -1                 
                                                    | Disk capacity that 
shuffle server can use. If negative, it will use disk whole space * ratio       
                                                                                
                                                                                
                                                                                
                     [...]
-| rss.server.disk.capacity.ratio                          | 0.9                
                                                    | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                   [...]
-| rss.server.hybrid.storage.fallback.strategy.class       | -                  
                                                    | The fallback strategy for 
`MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`. 
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackSt 
[...]
-| rss.server.leak.shuffledata.check.interval              | 3600000            
                                                    | The interval of leak 
shuffle data check (ms)                                                         
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.server.max.concurrency.of.per-partition.write       | 30                 
                                                    | The max concurrency of 
single partition writer, the data partition file number is equal to this value. 
Default value is 1. This config could improve the writing speed, especially for 
huge partition.                                                                 
                                                                                
                 [...]
-| rss.server.max.concurrency.limit.of.per-partition.write | -                  
                                                    | The limit for max 
concurrency per-partition write specified by client, this won't be enabled by 
default.                                                                        
                                                                                
                                                                                
                        [...]
-| rss.metrics.reporter.class                              | -                  
                                                    | The class of metrics 
reporter.                                                                       
                                                                                
                                                                                
                                                                                
                   [...]
-| rss.server.hybrid.storage.manager.selector.class        | 
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                               
[...]
-| rss.server.disk-capacity.watermark.check.enabled        | false              
                                                    | If it is co-located with 
other services, the high-low watermark check based on the uniffle used is not 
correct. Due to this, the whole disk capacity watermark check is necessary, 
which will reuse the current watermark value. It will be disabled by default.   
                                                                                
                     [...]
+| Property Name                                            | Default           
                                                     | Description              
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+|----------------------------------------------------------|------------------------------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
+| rss.coordinator.quorum                                   | -                 
                                                     | Coordinator quorum       
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| rss.rpc.server.type                                      | GRPC              
                                                     | Shuffle server type, 
supports GRPC_NETTY, GRPC. The default value is GRPC. But we recommend using 
GRPC_NETTY to enable Netty on the server side for better stability and 
performance.                                                                    
                                                                                
                              [...]
+| rss.rpc.server.port                                      | 19999             
                                                     | RPC port for Shuffle 
server, if set zero, grpc server start on random port.                          
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.jetty.http.port                                      | 19998             
                                                     | Http port for Shuffle 
server                                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.netty.port                                    | -1                
                                                     | Netty port for Shuffle 
server, if set zero, Netty server start on random port.                         
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.netty.epoll.enable                            | false             
                                                     | Whether to enable epoll 
model with Netty server.                                                        
                                                                                
                                                                                
                                                                                
               [...]
+| rss.server.netty.accept.thread                           | 10                
                                                     | Accept thread count in 
netty.                                                                          
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.netty.worker.thread                           | 0                 
                                                     | Worker thread count in 
netty. When set to 0, the default value is dynamically set to twice the number 
of processor cores, but it will not be less than 100 to ensure the minimum 
throughput of the service.                                                      
                                                                                
                      [...]
+| rss.server.netty.connect.backlog                         | 0                 
                                                     | For Netty server, 
requested maximum length of the queue of incoming connections.                  
                                                                                
                                                                                
                                                                                
                     [...]
+| rss.server.netty.connect.timeout                         | 5000              
                                                     | Timeout for connection 
in netty.                                                                       
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.netty.receive.buf                             | 0                 
                                                     | Receive buffer size 
(SO_RCVBUF). Note: the optimal size for receive buffer and send buffer should 
be latency * network_bandwidth. Assuming latency = 1ms, network_bandwidth = 
10Gbps, buffer size should be ~ 1.25MB. Default is 0, the operating system 
automatically estimates the receive buffer size based on default settings.      
                              [...]
+| rss.server.netty.send.buf                                | 0                 
                                                     | Send buffer size 
(SO_SNDBUF).                                                                    
                                                                                
                                                                                
                                                                                
                      [...]
+| rss.server.buffer.capacity                               | -1                
                                                     | Max memory of buffer 
manager for shuffle server. If negative, JVM heap size * buffer.ratio is used   
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.buffer.capacity.ratio                         | 0.6               
                                                     | when 
`rss.server.buffer.capacity`=-1, then the buffer capacity is JVM heap size or 
off-heap size(when enabling Netty) * ratio                                      
                                                                                
                                                                                
                                    [...]
+| rss.server.memory.shuffle.highWaterMark.percentage       | 75.0              
                                                     | Threshold of spill data 
to storage, percentage of rss.server.buffer.capacity                            
                                                                                
                                                                                
                                                                                
               [...]
+| rss.server.memory.shuffle.lowWaterMark.percentage        | 25.0              
                                                     | Threshold of keep data 
in memory, percentage of rss.server.buffer.capacity                             
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.read.buffer.capacity                          | -1                
                                                     | Max size of buffer for 
reading data. If negative, JVM heap size * read.buffer.ratio is used            
                                                                                
                                                                                
                                                                                
                [...]
+| rss.server.read.buffer.capacity.ratio                    | 0.2               
                                                     | when 
`rss.server.read.buffer.capacity`=-1, then read buffer capacity is JVM heap 
size or off-heap size(when enabling Netty) * ratio                              
                                                                                
                                                                                
                                      [...]
+| rss.server.heartbeat.interval                            | 10000             
                                                     | Heartbeat interval to 
Coordinator (ms)                                                                
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.netty.metrics.pendingTaskNumPollingIntervalMs | 10000             
                                                     | How often to collect 
Netty pending tasks number metrics (in milliseconds)                            
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.flush.localfile.threadPool.size               | 10                
                                                     | Thread pool for flush 
data to local file                                                              
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.flush.hadoop.threadPool.size                  | 60                
                                                     | Thread pool for flush 
data to hadoop storage                                                          
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.commit.timeout                                | 600000            
                                                     | Timeout when commit 
shuffle data (ms)                                                               
                                                                                
                                                                                
                                                                                
                   [...]
+| rss.storage.type                                         | -                 
                                                     | Supports 
MEMORY_LOCALFILE, MEMORY_HDFS, MEMORY_LOCALFILE_HDFS                            
                                                                                
                                                                                
                                                                                
                              [...]
+| rss.server.flush.cold.storage.threshold.size             | 64M               
                                                     | The threshold of data 
size for LOACALFILE and HADOOP if MEMORY_LOCALFILE_HDFS is used                 
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.tags                                          | -                 
                                                     | The comma-separated list 
of tags to indicate the shuffle server's attributes. It will be used as the 
assignment basis for the coordinator                                            
                                                                                
                                                                                
                  [...]
+| rss.server.single.buffer.flush.enabled                   | true              
                                                     | Whether single buffer 
flush when size exceeded rss.server.single.buffer.flush.threshold               
                                                                                
                                                                                
                                                                                
                 [...]
+| rss.server.single.buffer.flush.threshold                 | 128M              
                                                     | The threshold of single 
shuffle buffer flush                                                            
                                                                                
                                                                                
                                                                                
               [...]
+| rss.server.disk.capacity                                 | -1                
                                                     | Disk capacity that 
shuffle server can use. If negative, it will use disk whole space * ratio       
                                                                                
                                                                                
                                                                                
                    [...]
+| rss.server.disk.capacity.ratio                           | 0.9               
                                                     | When 
`rss.server.disk.capacity` is negative, disk whole space * ratio is used        
                                                                                
                                                                                
                                                                                
                                  [...]
+| rss.server.hybrid.storage.fallback.strategy.class        | -                 
                                                     | The fallback strategy 
for `MEMORY_LOCALFILE_HDFS`. Support 
`org.apache.uniffle.server.storage.RotateStorageManagerFallbackStrategy`,`org.apache.uniffle.server.storage.LocalStorageManagerFallbackStrategy`
 and `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackStrategy`. 
If not set, `org.apache.uniffle.server.storage.HadoopStorageManagerFallbackS 
[...]
+| rss.server.leak.shuffledata.check.interval               | 3600000           
                                                     | The interval of leak 
shuffle data check (ms)                                                         
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.max.concurrency.of.per-partition.write        | 30                
                                                     | The max concurrency of 
single partition writer, the data partition file number is equal to this value. 
Default value is 1. This config could improve the writing speed, especially for 
huge partition.                                                                 
                                                                                
                [...]
+| rss.server.max.concurrency.limit.of.per-partition.write  | -                 
                                                     | The limit for max 
concurrency per-partition write specified by client, this won't be enabled by 
default.                                                                        
                                                                                
                                                                                
                       [...]
+| rss.metrics.reporter.class                               | -                 
                                                     | The class of metrics 
reporter.                                                                       
                                                                                
                                                                                
                                                                                
                  [...]
+| rss.server.hybrid.storage.manager.selector.class         | 
org.apache.uniffle.server.storage.hybrid.DefaultStorageManagerSelector | The 
manager selector strategy for `MEMORY_LOCALFILE_HDFS`. Default value is 
`DefaultStorageManagerSelector`, and another 
`HugePartitionSensitiveStorageManagerSelector` will flush only huge partition's 
data to cold storage.                                                           
                                                                              
[...]
+| rss.server.disk-capacity.watermark.check.enabled         | false             
                                                     | If it is co-located with 
other services, the high-low watermark check based on the uniffle used is not 
correct. Due to this, the whole disk capacity watermark check is necessary, 
which will reuse the current watermark value. It will be disabled by default.   
                                                                                
                    [...]
 
 ### Advanced Configurations
 | Property Name                                    | Default | Description     
                                                                                
                                                                                
            |
diff --git 
a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java 
b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
index 088252779..fe3979012 100644
--- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
+++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java
@@ -89,6 +89,13 @@ public class ShuffleServerConf extends RssBaseConf {
           .defaultValue(10 * 1000L)
           .withDescription("Direct memory usage tracker interval to 
MetricSystem (ms)");
 
+  public static final ConfigOption<Long> 
SERVER_NETTY_PENDING_TASKS_NUM_TRACKER_INTERVAL =
+      
ConfigOptions.key("rss.server.netty.metrics.pendingTaskNumPollingIntervalMs")
+          .longType()
+          .defaultValue(10 * 1000L)
+          .withDescription(
+              "How often to collect Netty pending tasks number metrics (in 
milliseconds)");
+
   public static final ConfigOption<Integer> 
SERVER_FLUSH_LOCALFILE_THREAD_POOL_SIZE =
       ConfigOptions.key("rss.server.flush.localfile.threadPool.size")
           .intType()
diff --git 
a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java 
b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
index e1eabe060..f8410fcc9 100644
--- a/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
+++ b/server/src/main/java/org/apache/uniffle/server/netty/StreamServer.java
@@ -18,7 +18,10 @@
 package org.apache.uniffle.server.netty;
 
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.StreamSupport;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelFuture;
@@ -31,6 +34,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.util.NettyRuntime;
+import io.netty.util.concurrent.SingleThreadEventExecutor;
 import io.netty.util.internal.SystemPropertyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import org.apache.uniffle.common.util.Constants;
 import org.apache.uniffle.common.util.ExitUtils;
 import org.apache.uniffle.common.util.NettyUtils;
 import org.apache.uniffle.common.util.RssUtils;
+import org.apache.uniffle.common.util.ThreadUtils;
 import org.apache.uniffle.server.ShuffleServer;
 import org.apache.uniffle.server.ShuffleServerConf;
 
@@ -59,6 +64,12 @@ public class StreamServer implements ServerInterface {
   private ShuffleServerConf shuffleServerConf;
   private ChannelFuture channelFuture;
 
+  private final ScheduledExecutorService nettyPendingTasksNumTracker =
+      Executors.newSingleThreadScheduledExecutor(
+          ThreadUtils.getThreadFactory("NettyPendingTasksNumTracker"));
+  /** Interval to poll for Netty pending tasks number for Netty metrics, in 
milliseconds */
+  private final long pendingTasksNumMetricsPollingInterval;
+
   public StreamServer(ShuffleServer shuffleServer) {
     this.shuffleServer = shuffleServer;
     this.shuffleServerConf = shuffleServer.getShuffleServerConf();
@@ -80,6 +91,37 @@ public class StreamServer implements ServerInterface {
       shuffleBossGroup = new NioEventLoopGroup(acceptThreads);
       shuffleWorkerGroup = new NioEventLoopGroup(workerThreads);
     }
+    this.pendingTasksNumMetricsPollingInterval =
+        shuffleServerConf.getLong(
+            ShuffleServerConf.SERVER_NETTY_PENDING_TASKS_NUM_TRACKER_INTERVAL);
+    startMonitoringPendingTasks();
+  }
+
+  private void startMonitoringPendingTasks() {
+    nettyPendingTasksNumTracker.scheduleAtFixedRate(
+        () -> {
+          int pendingTasksNumForBossGroup = 
getPendingTasksForEventLoopGroup(shuffleBossGroup);
+          shuffleServer
+              .getNettyMetrics()
+              .getGaugeNettyPendingTasksNumForBossGroup()
+              .set(pendingTasksNumForBossGroup);
+
+          int pendingTasksNumForWorkerGroup = 
getPendingTasksForEventLoopGroup(shuffleWorkerGroup);
+          shuffleServer
+              .getNettyMetrics()
+              .getGaugeNettyPendingTasksNumForWorkerGroup()
+              .set(pendingTasksNumForWorkerGroup);
+        },
+        0L,
+        pendingTasksNumMetricsPollingInterval,
+        TimeUnit.MILLISECONDS);
+  }
+
+  private int getPendingTasksForEventLoopGroup(EventLoopGroup eventLoopGroup) {
+    return StreamSupport.stream(eventLoopGroup.spliterator(), false)
+        .filter(eventExecutor -> eventExecutor instanceof 
SingleThreadEventExecutor)
+        .mapToInt(eventExecutor -> ((SingleThreadEventExecutor) 
eventExecutor).pendingTasks())
+        .sum();
   }
 
   private ServerBootstrap bootstrapChannel(
@@ -177,6 +219,9 @@ public class StreamServer implements ServerInterface {
       shuffleBossGroup = null;
       shuffleWorkerGroup = null;
     }
+    if (!nettyPendingTasksNumTracker.isShutdown()) {
+      nettyPendingTasksNumTracker.shutdown();
+    }
   }
 
   @Override
diff --git 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
index a5824a5b0..c4462f1c2 100644
--- 
a/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
+++ 
b/server/src/test/java/org/apache/uniffle/server/ShuffleServerMetricsTest.java
@@ -239,7 +239,7 @@ public class ShuffleServerMetricsTest {
     ObjectMapper mapper = new ObjectMapper();
     JsonNode actualObj = mapper.readTree(content);
     assertEquals(2, actualObj.size());
-    assertEquals(66, actualObj.get("metrics").size());
+    assertEquals(68, actualObj.get("metrics").size());
   }
 
   @Test


Reply via email to