This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5bfb00818d Add metrics to export netty direct memory used and max 
(#11575)
5bfb00818d is described below

commit 5bfb00818da767a31bf8f0c7a9dd4976e99c74db
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 2 20:33:09 2023 +0200

    Add metrics to export netty direct memory used and max (#11575)
    
    * Add metrics to export netty direct memory used and max
    
    * Fix new netty metrics to use actual suppliers
    
    * Add utility methods AbstractMetrics.setOrUpdateGlobalGauge
    
    * Fix tests
    
    * Initialize ServerMetrics static instance in tests
---
 .../pinot/common/metrics/AbstractMetrics.java      | 30 ++++++++++++++++++++++
 .../apache/pinot/common/metrics/BrokerGauge.java   | 23 ++++++++++++++++-
 .../apache/pinot/common/metrics/ServerGauge.java   | 21 +++++++++++++++
 .../apache/pinot/common/metrics/ServerMetrics.java |  6 +++++
 .../apache/pinot/core/transport/QueryServer.java   | 17 ++++++++++++
 .../pinot/core/transport/ServerChannels.java       | 14 ++++++++++
 .../pinot/core/transport/QueryRoutingTest.java     | 10 +++++++-
 7 files changed, 119 insertions(+), 2 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 98f65a4be9..6dccf789d1 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
+import com.google.common.base.Preconditions;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
@@ -27,6 +28,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.Utils;
@@ -657,6 +659,34 @@ public abstract class AbstractMetrics<QP extends 
AbstractMetrics.QueryPhase, M e
     pinotGauge.setValueSupplier(valueSupplier);
   }
 
+  /**
+   * Like {@link #setOrUpdateGauge(String, Supplier)}
+   */
+  public void setOrUpdateGauge(final String metricName, final LongSupplier 
valueSupplier) {
+    PinotGauge<Long> pinotGauge = PinotMetricUtils.makeGauge(_metricsRegistry,
+        PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + 
metricName),
+        PinotMetricUtils.makePinotGauge(avoid -> valueSupplier.getAsLong()));
+    pinotGauge.setValueSupplier((Supplier<Long>) () -> (Long) 
valueSupplier.getAsLong());
+  }
+
+  /**
+   * Like {@link #setOrUpdateGauge(String, Supplier)} but using a global gauge
+   * @throws IllegalArgumentException if the gauge is not global
+   */
+  public void setOrUpdateGlobalGauge(final G gauge, final Supplier<Long> 
valueSupplier) {
+    Preconditions.checkArgument(gauge.isGlobal(), "Only global gauges should 
be sent to this method");
+    setOrUpdateGauge(gauge.getGaugeName(), valueSupplier);
+  }
+
+  /**
+   * Like {@link #setOrUpdateGauge(String, LongSupplier)} but using a global 
gauge
+   * @throws IllegalArgumentException if the gauge is not global
+   */
+  public void setOrUpdateGlobalGauge(final G gauge, final LongSupplier 
valueSupplier) {
+    Preconditions.checkArgument(gauge.isGlobal(), "Only global gauges should 
be sent to this method");
+    setOrUpdateGauge(gauge.getGaugeName(), valueSupplier);
+  }
+
   /**
    * Removes a global gauge given the key and the gauge
    * @param key the key associated with the gauge
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 93bf03edbe..25e66eabd0 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
+import io.netty.buffer.PooledByteBufAllocatorMetric;
 import org.apache.pinot.common.Utils;
 
 
@@ -35,7 +36,27 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
   RESIZE_TIME_MS("milliseconds", false),
   UNHEALTHY_SERVERS("servers", true),
   TIME_BOUNDARY_DIFFERENCE("milliseconds", false),
-  JVM_HEAP_USED_BYTES("bytes", true);
+  JVM_HEAP_USED_BYTES("bytes", true),
+  NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true),
+  NETTY_POOLED_USED_HEAP_MEMORY("bytes", true),
+  NETTY_POOLED_ARENAS_DIRECT("arenas", true),
+  NETTY_POOLED_ARENAS_HEAP("arenas", true),
+
+  /**
+   * The size of the small cache.
+   * See {@link PooledByteBufAllocatorMetric#smallCacheSize()}
+   */
+  NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true),
+  /**
+   * The size of the normal cache.
+   * See {@link PooledByteBufAllocatorMetric#normalCacheSize()}
+   */
+  NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true),
+  /**
+   * The cache size used by the allocator for normal arenas
+   */
+  NETTY_POOLED_THREADLOCALCACHE("bytes", true),
+  NETTY_POOLED_CHUNK_SIZE("bytes", true);
 
   private final String _brokerGaugeName;
   private final String _unit;
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index c05baa0cca..626c378146 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
+import io.netty.buffer.PooledByteBufAllocatorMetric;
 import org.apache.pinot.common.Utils;
 
 
@@ -46,6 +47,26 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
   CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
   JVM_HEAP_USED_BYTES("bytes", true),
+  NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true),
+  NETTY_POOLED_USED_HEAP_MEMORY("bytes", true),
+  NETTY_POOLED_ARENAS_DIRECT("arenas", true),
+  NETTY_POOLED_ARENAS_HEAP("arenas", true),
+
+  /**
+   * The size of the small cache.
+   * See {@link PooledByteBufAllocatorMetric#smallCacheSize()}
+   */
+  NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true),
+  /**
+   * The size of the normal cache.
+   * See {@link PooledByteBufAllocatorMetric#normalCacheSize()}
+   */
+  NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true),
+  /**
+   * The cache size used by the allocator for normal arenas
+   */
+  NETTY_POOLED_THREADLOCALCACHE("bytes", true),
+  NETTY_POOLED_CHUNK_SIZE("bytes", true),
   // Ingestion delay metrics
   REALTIME_INGESTION_DELAY_MS("milliseconds", false),
   END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
index 43e5f5804b..c2cb9ce872 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.common.metrics;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +43,11 @@ public class ServerMetrics extends 
AbstractMetrics<ServerQueryPhase, ServerMeter
     return SERVER_METRICS_INSTANCE.compareAndSet(null, serverMetrics);
   }
 
+  @VisibleForTesting
+  public static void deregister() {
+    SERVER_METRICS_INSTANCE.set(null);
+  }
+
   /**
    * should always call after registration
    */
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 9bca2a6ca1..915b2eb1f2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.core.transport;
 
 import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelInitializer;
@@ -37,6 +39,8 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.core.util.OsCheck;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -114,8 +118,21 @@ public class QueryServer {
   public void start() {
     try {
       ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+      PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
+      PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+      ServerMetrics metrics = ServerMetrics.get();
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_DIRECT_MEMORY, 
metric::usedDirectMemory);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_HEAP_MEMORY, 
metric::usedHeapMemory);
+      metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_DIRECT, 
metric::numDirectArenas);
+      metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_HEAP, 
metric::numHeapArenas);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_SMALL, 
metric::smallCacheSize);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL, 
metric::normalCacheSize);
+      
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_THREADLOCALCACHE, 
metric::numThreadLocalCaches);
+      metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
       _channel = serverBootstrap.group(_bossGroup, 
_workerGroup).channel(_channelClass)
           .option(ChannelOption.SO_BACKLOG, 
128).childOption(ChannelOption.SO_KEEPALIVE, true)
+          .option(ChannelOption.ALLOCATOR, bufAllocator)
           .childHandler(new ChannelInitializer<SocketChannel>() {
             @Override
             protected void initChannel(SocketChannel ch) {
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 82bbf64333..c9fe068ed4 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.core.transport;
 
 import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
 import io.netty.buffer.Unpooled;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelInitializer;
@@ -151,7 +153,19 @@ public class ServerChannels {
 
     ServerChannel(ServerRoutingInstance serverRoutingInstance) {
       _serverRoutingInstance = serverRoutingInstance;
+      PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
+      PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
 metric::usedDirectMemory);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
 metric::usedHeapMemory);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT, 
metric::numDirectArenas);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP, 
metric::numHeapArenas);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
 metric::smallCacheSize);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
 metric::normalCacheSize);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE,
 metric::numThreadLocalCaches);
+      
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE, 
metric::chunkSize);
+
       _bootstrap = new 
Bootstrap().remoteAddress(serverRoutingInstance.getHostname(), 
serverRoutingInstance.getPort())
+          .option(ChannelOption.ALLOCATOR, bufAllocator)
           
.group(_eventLoopGroup).channel(_channelClass).option(ChannelOption.SO_KEEPALIVE,
 true)
           .handler(new ChannelInitializer<SocketChannel>() {
             @Override
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 5e1fa04176..2a30eefa2f 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.apache.pinot.util.TestUtils;
 import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
@@ -77,9 +78,16 @@ public class QueryRoutingTest {
     _requestCount = 0;
   }
 
+  @AfterMethod
+  void deregisterServerMetrics() {
+    ServerMetrics.deregister();
+  }
+
   private QueryServer getQueryServer(int responseDelayMs, byte[] 
responseBytes) {
+    ServerMetrics serverMetrics = mock(ServerMetrics.class);
     InstanceRequestHandler handler = new InstanceRequestHandler("server01", 
new PinotConfiguration(),
-        mockQueryScheduler(responseDelayMs, responseBytes), 
mock(ServerMetrics.class), mock(AccessControl.class));
+        mockQueryScheduler(responseDelayMs, responseBytes), serverMetrics, 
mock(AccessControl.class));
+    ServerMetrics.register(serverMetrics);
     return new QueryServer(TEST_PORT, null, handler);
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to