This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new afbb0fc170c Add new metrics exporting the total max and used memory by 
Netty and gRPC (#16939)
afbb0fc170c is described below

commit afbb0fc170cdddcd7abc07c72ddf2a0791de3cff
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 6 11:20:27 2025 +0200

    Add new metrics exporting the total max and used memory by Netty and gRPC 
(#16939)
---
 .github/workflows/pinot_tests.yml                  |  6 ++++++
 .mvn/jvm.config                                    |  8 ++++++++
 .../apache/pinot/broker/grpc/BrokerGrpcServer.java | 10 ++++++++++
 .../apache/pinot/common/metrics/BrokerGauge.java   | 11 ++++++++++-
 .../apache/pinot/common/metrics/ServerGauge.java   | 17 ++++++++++++++++
 pinot-connectors/pinot-spark-3-connector/pom.xml   |  3 +++
 .../apache/pinot/core/transport/QueryServer.java   |  4 ++++
 .../pinot/core/transport/grpc/GrpcQueryServer.java | 10 ++++++++++
 .../query/mailbox/channel/GrpcMailboxServer.java   | 23 ++++++++++++++++++++++
 .../src/main/resources/appAssemblerScriptTemplate  |  3 +++
 pom.xml                                            |  3 +++
 11 files changed, 97 insertions(+), 1 deletion(-)

diff --git a/.github/workflows/pinot_tests.yml 
b/.github/workflows/pinot_tests.yml
index 6f71f8617ca..dcdc5a57a12 100644
--- a/.github/workflows/pinot_tests.yml
+++ b/.github/workflows/pinot_tests.yml
@@ -185,6 +185,9 @@ jobs:
             --add-opens=java.base/java.lang=ALL-UNNAMED
             --add-opens=java.base/java.util=ALL-UNNAMED
             --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+            --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
+            -Dio.netty.tryReflectionSetAccessible=true
+            -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true
         run: .github/workflows/scripts/pr-tests/.pinot_tests_build.sh
       - name: Unit Test
         env:
@@ -206,6 +209,9 @@ jobs:
             --add-opens=java.base/java.lang=ALL-UNNAMED
             --add-opens=java.base/java.util=ALL-UNNAMED
             --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+            --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
+            -Dio.netty.tryReflectionSetAccessible=true
+            -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true
         run: .github/workflows/scripts/pr-tests/.pinot_tests_unit.sh
       - name: Upload coverage to Codecov
         uses: codecov/codecov-action@v5
diff --git a/.mvn/jvm.config b/.mvn/jvm.config
new file mode 100644
index 00000000000..d71cbc89466
--- /dev/null
+++ b/.mvn/jvm.config
@@ -0,0 +1,8 @@
+--add-opens=java.base/java.nio=ALL-UNNAMED
+--add-opens=java.base/sun.nio.ch=ALL-UNNAMED
+--add-opens=java.base/java.lang=ALL-UNNAMED
+--add-opens=java.base/java.util=ALL-UNNAMED
+--add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
+-Dio.netty.tryReflectionSetAccessible=true
+-Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true
\ No newline at end of file
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java 
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
index c2de7bc1b8e..b4ba3fcdd02 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
@@ -35,6 +35,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
+import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.Map;
@@ -168,6 +169,15 @@ public class BrokerGrpcServer extends 
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
     
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
     
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
     
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
+    // Notice here we are using 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent instead of
+    // io.netty.util.internal.PlatformDependent because gRPC shades Netty to 
avoid version conflicts.
+    // This also means it uses a different pool of direct memory and a 
different setting of max direct memory.
+    //
+    // Also notice these two metrics are also set by GrpcQueryService. Both 
are set to the same value, so it
+    // doesn't matter which one _wins_ in the metrics system.
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_TOTAL_MAX_DIRECT_MEMORY, 
PlatformDependent::maxDirectMemory);
+    
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_TOTAL_USED_DIRECT_MEMORY,
+        PlatformDependent::usedDirectMemory);
   }
 
   public void start() {
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index c2e602e03ea..ccd9fcda9da 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -97,7 +97,16 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
   MAILBOX_SERVER_CACHE_SIZE_SMALL("bytes", true),
   MAILBOX_SERVER_CACHE_SIZE_NORMAL("bytes", true),
   MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
-  MAILBOX_SERVER_CHUNK_SIZE("bytes", true);
+  MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
+
+  /// Exports the max amount of direct memory that can be allocated by the 
shaded Netty code used by gRPC
+  /// It is basically an adaptor for 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.maxDirectMemory()
+  ///
+  /// This value can be changed by setting the JVM option 
-Dio.grpc.netty.shaded.io.netty.maxDirectMemory
+  GRPC_TOTAL_MAX_DIRECT_MEMORY("bytes", true),
+  /// Exports the total amount of direct memory allocated by the shaded Netty 
code used by gRPC
+  /// It is basically an adaptor for 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory()
+  GRPC_TOTAL_USED_DIRECT_MEMORY("bytes", true);
 
   private final String _brokerGaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index c2864c0310f..dcc8d6776c5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -123,6 +123,23 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
   MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
 
+  /// Exports the max amount of direct memory that can be allocated by Netty
+  /// It is basically an adaptor for 
io.netty.util.internal.PlatformDependent.maxDirectMemory()
+  ///
+  /// This value can be changed by setting the JVM option 
-Dio.netty.maxDirectMemory
+  NETTY_TOTAL_MAX_DIRECT_MEMORY("bytes", true),
+  /// Exports the total amount of direct memory allocated by Netty
+  /// It is basically an adaptor for 
io.netty.util.internal.PlatformDependent.usedDirectMemory()
+  NETTY_TOTAL_USED_DIRECT_MEMORY("bytes", true),
+  /// Exports the max amount of direct memory that can be allocated by the 
shaded Netty code used by gRPC
+  /// It is basically an adaptor for 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.maxDirectMemory()
+  ///
+  /// This value can be changed by setting the JVM option 
-Dio.grpc.netty.shaded.io.netty.maxDirectMemory
+  GRPC_TOTAL_MAX_DIRECT_MEMORY("bytes", true),
+  /// Exports the total amount of direct memory allocated by the shaded Netty 
code used by gRPC
+  /// It is basically an adaptor for 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.usedDirectMemory()
+  GRPC_TOTAL_USED_DIRECT_MEMORY("bytes", true),
+
   // how many message are there in the server's message queue in helix
   HELIX_MESSAGES_COUNT("count", true),
   STARTUP_STATUS_CHECK_IN_PROGRESS("state", true,
diff --git a/pinot-connectors/pinot-spark-3-connector/pom.xml 
b/pinot-connectors/pinot-spark-3-connector/pom.xml
index f2304e55ba7..aaff014e4a2 100644
--- a/pinot-connectors/pinot-spark-3-connector/pom.xml
+++ b/pinot-connectors/pinot-spark-3-connector/pom.xml
@@ -156,6 +156,9 @@
             --add-opens=java.base/java.lang=ALL-UNNAMED
             --add-opens=java.base/java.util=ALL-UNNAMED
             --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+            --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
+            -Dio.netty.tryReflectionSetAccessible=true
+            -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true
           </argLine>
         </configuration>
       </plugin>
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 384f2d31338..765c474918f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -36,6 +36,7 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.ServerSocketChannel;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.util.internal.PlatformDependent;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.config.NettyConfig;
@@ -130,6 +131,9 @@ public class QueryServer {
       
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL, 
metric::normalCacheSize);
       
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_THREADLOCALCACHE, 
metric::numThreadLocalCaches);
       metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_TOTAL_MAX_DIRECT_MEMORY, 
PlatformDependent::maxDirectMemory);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_TOTAL_USED_DIRECT_MEMORY, 
PlatformDependent::usedDirectMemory);
+
       _channel = (ServerSocketChannel) serverBootstrap.group(_bossGroup, 
_workerGroup).channel(_channelClass)
           .option(ChannelOption.SO_BACKLOG, 128)
           .childOption(ChannelOption.SO_KEEPALIVE, true)
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
index 75ed3b6075e..5f0cf528aa6 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/grpc/GrpcQueryServer.java
@@ -32,6 +32,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.ClientAuth;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
+import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.Map;
@@ -147,6 +148,15 @@ public class GrpcQueryServer extends 
PinotQueryServerGrpc.PinotQueryServerImplBa
       
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_THREADLOCALCACHE, 
metric::numThreadLocalCaches);
       metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
 
+      // Notice here we are using 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent instead of
+      // io.netty.util.internal.PlatformDependent because gRPC shades Netty to 
avoid version conflicts.
+      // This also means it uses a different pool of direct memory and a 
different setting of max direct memory.
+      //
+      // Also notice these two metrics are also set by the MSE query engine. 
Both are set to the same value, so it
+      // doesn't matter which one _wins_ in the metrics system.
+      metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_TOTAL_MAX_DIRECT_MEMORY, 
PlatformDependent::maxDirectMemory);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_TOTAL_USED_DIRECT_MEMORY, 
PlatformDependent::usedDirectMemory);
+
       _server = builder
           .maxInboundMessageSize(config.getMaxInboundMessageSizeBytes())
           .addService(this)
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 5367c92eb4e..92bf84be5fd 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -24,6 +24,7 @@ import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder;
 import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
 import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric;
 import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
+import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -79,6 +80,17 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
       
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
       
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
       
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_SERVER_CHUNK_SIZE, 
metric::chunkSize);
+
+      // Notice here we are using 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent instead of
+      // io.netty.util.internal.PlatformDependent because gRPC shades Netty to 
avoid version conflicts.
+      // This also means it uses a different pool of direct memory and a 
different setting of max direct memory.
+      //
+      // Also notice these two metrics are also set by GrpcQueryService. Both 
are set to the same value, so it
+      // doesn't matter which one _wins_ in the metrics system.
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_TOTAL_MAX_DIRECT_MEMORY,
+          PlatformDependent::maxDirectMemory);
+      
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.GRPC_TOTAL_USED_DIRECT_MEMORY,
+          PlatformDependent::usedDirectMemory);
     } else {
       Preconditions.checkState(instanceType == InstanceType.SERVER, 
"Unexpected instance type: %s", instanceType);
       ServerMetrics serverMetrics = ServerMetrics.get();
@@ -90,6 +102,17 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
       
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
       
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
       
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_SERVER_CHUNK_SIZE, 
metric::chunkSize);
+
+      // Notice here we are using 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent instead of
+      // io.netty.util.internal.PlatformDependent because gRPC shades Netty to 
avoid version conflicts.
+      // This also means it uses a different pool of direct memory and a 
different setting of max direct memory.
+      //
+      // Also notice these two metrics are also set by GrpcQueryService. Both 
are set to the same value, so it
+      // doesn't matter which one _wins_ in the metrics system.
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_TOTAL_MAX_DIRECT_MEMORY,
+          PlatformDependent::maxDirectMemory);
+      
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.GRPC_TOTAL_USED_DIRECT_MEMORY,
+          PlatformDependent::usedDirectMemory);
     }
 
     NettyServerBuilder builder = NettyServerBuilder
diff --git a/pinot-tools/src/main/resources/appAssemblerScriptTemplate 
b/pinot-tools/src/main/resources/appAssemblerScriptTemplate
index f639f3d653c..ba6bb9893f2 100644
--- a/pinot-tools/src/main/resources/appAssemblerScriptTemplate
+++ b/pinot-tools/src/main/resources/appAssemblerScriptTemplate
@@ -206,6 +206,9 @@ if [ "$(jdk_version)" -gt 11 ]; then
   --add-opens=java.base/java.lang=ALL-UNNAMED \
   --add-opens=java.base/java.util=ALL-UNNAMED \
   --add-opens=java.base/java.lang.reflect=ALL-UNNAMED \
+  --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED \
+  -Dio.netty.tryReflectionSetAccessible=true \
+  -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true \
   $JAVA_OPTS"
 fi
 
diff --git a/pom.xml b/pom.xml
index 914bc6b44a5..5485514eb8f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2288,6 +2288,9 @@
               --add-opens=java.base/java.util=ALL-UNNAMED
               --add-exports=java.base/jdk.internal.util.random=ALL-UNNAMED
               --add-opens=java.base/java.lang.reflect=ALL-UNNAMED
+              --add-opens=java.base/jdk.internal.misc=ALL-UNNAMED
+              -Dio.netty.tryReflectionSetAccessible=true
+              -Dio.grpc.netty.shaded.io.netty.tryReflectionSetAccessible=true
               -Dnet.bytebuddy.experimental=true
               
-javaagent:${settings.localRepository}/org/mockito/mockito-core/${mockito-core.version}/mockito-core-${mockito-core.version}.jar
               -Xshare:off


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to