This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5bfb00818d Add metrics to export netty direct memory used and max
(#11575)
5bfb00818d is described below
commit 5bfb00818da767a31bf8f0c7a9dd4976e99c74db
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Mon Oct 2 20:33:09 2023 +0200
Add metrics to export netty direct memory used and max (#11575)
* Add metrics to export netty direct memory used and max
* Fix new netty metrics to use actual suppliers
* Add utility methods AbstractMetrics.setOrUpdateGlobalGauge
* Fix tests
* Initialize ServerMetrics static instance in tests
---
.../pinot/common/metrics/AbstractMetrics.java | 30 ++++++++++++++++++++++
.../apache/pinot/common/metrics/BrokerGauge.java | 23 ++++++++++++++++-
.../apache/pinot/common/metrics/ServerGauge.java | 21 +++++++++++++++
.../apache/pinot/common/metrics/ServerMetrics.java | 6 +++++
.../apache/pinot/core/transport/QueryServer.java | 17 ++++++++++++
.../pinot/core/transport/ServerChannels.java | 14 ++++++++++
.../pinot/core/transport/QueryRoutingTest.java | 10 +++++++-
7 files changed, 119 insertions(+), 2 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
index 98f65a4be9..6dccf789d1 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metrics;
+import com.google.common.base.Preconditions;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
@@ -27,6 +28,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.pinot.common.Utils;
@@ -657,6 +659,34 @@ public abstract class AbstractMetrics<QP extends
AbstractMetrics.QueryPhase, M e
pinotGauge.setValueSupplier(valueSupplier);
}
+ /**
+ * Like {@link #setOrUpdateGauge(String, Supplier)}
+ */
+ public void setOrUpdateGauge(final String metricName, final LongSupplier
valueSupplier) {
+ PinotGauge<Long> pinotGauge = PinotMetricUtils.makeGauge(_metricsRegistry,
+ PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix +
metricName),
+ PinotMetricUtils.makePinotGauge(avoid -> valueSupplier.getAsLong()));
+ pinotGauge.setValueSupplier((Supplier<Long>) () -> (Long)
valueSupplier.getAsLong());
+ }
+
+ /**
+ * Like {@link #setOrUpdateGauge(String, Supplier)} but using a global gauge
+ * @throws IllegalArgumentException if the gauge is not global
+ */
+ public void setOrUpdateGlobalGauge(final G gauge, final Supplier<Long>
valueSupplier) {
+ Preconditions.checkArgument(gauge.isGlobal(), "Only global gauges should
be sent to this method");
+ setOrUpdateGauge(gauge.getGaugeName(), valueSupplier);
+ }
+
+ /**
+ * Like {@link #setOrUpdateGauge(String, LongSupplier)} but using a global
gauge
+ * @throws IllegalArgumentException if the gauge is not global
+ */
+ public void setOrUpdateGlobalGauge(final G gauge, final LongSupplier
valueSupplier) {
+ Preconditions.checkArgument(gauge.isGlobal(), "Only global gauges should
be sent to this method");
+ setOrUpdateGauge(gauge.getGaugeName(), valueSupplier);
+ }
+
/**
* Removes a global gauge given the key and the gauge
* @param key the key associated with the gauge
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 93bf03edbe..25e66eabd0 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metrics;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
import org.apache.pinot.common.Utils;
@@ -35,7 +36,27 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
RESIZE_TIME_MS("milliseconds", false),
UNHEALTHY_SERVERS("servers", true),
TIME_BOUNDARY_DIFFERENCE("milliseconds", false),
- JVM_HEAP_USED_BYTES("bytes", true);
+ JVM_HEAP_USED_BYTES("bytes", true),
+ NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true),
+ NETTY_POOLED_USED_HEAP_MEMORY("bytes", true),
+ NETTY_POOLED_ARENAS_DIRECT("arenas", true),
+ NETTY_POOLED_ARENAS_HEAP("arenas", true),
+
+ /**
+ * The size of the small cache.
+ * See {@link PooledByteBufAllocatorMetric#smallCacheSize()}
+ */
+ NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true),
+ /**
+ * The size of the normal cache.
+ * See {@link PooledByteBufAllocatorMetric#normalCacheSize()}
+ */
+ NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true),
+ /**
+ * The cache size used by the allocator for normal arenas
+ */
+ NETTY_POOLED_THREADLOCALCACHE("bytes", true),
+ NETTY_POOLED_CHUNK_SIZE("bytes", true);
private final String _brokerGaugeName;
private final String _unit;
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index c05baa0cca..626c378146 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metrics;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
import org.apache.pinot.common.Utils;
@@ -46,6 +47,26 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
DEDUP_PRIMARY_KEYS_COUNT("dedupPrimaryKeysCount", false),
CONSUMPTION_QUOTA_UTILIZATION("ratio", false),
JVM_HEAP_USED_BYTES("bytes", true),
+ NETTY_POOLED_USED_DIRECT_MEMORY("bytes", true),
+ NETTY_POOLED_USED_HEAP_MEMORY("bytes", true),
+ NETTY_POOLED_ARENAS_DIRECT("arenas", true),
+ NETTY_POOLED_ARENAS_HEAP("arenas", true),
+
+ /**
+ * The size of the small cache.
+ * See {@link PooledByteBufAllocatorMetric#smallCacheSize()}
+ */
+ NETTY_POOLED_CACHE_SIZE_SMALL("bytes", true),
+ /**
+ * The size of the normal cache.
+ * See {@link PooledByteBufAllocatorMetric#normalCacheSize()}
+ */
+ NETTY_POOLED_CACHE_SIZE_NORMAL("bytes", true),
+ /**
+ * The cache size used by the allocator for normal arenas
+ */
+ NETTY_POOLED_THREADLOCALCACHE("bytes", true),
+ NETTY_POOLED_CHUNK_SIZE("bytes", true),
// Ingestion delay metrics
REALTIME_INGESTION_DELAY_MS("milliseconds", false),
END_TO_END_REALTIME_INGESTION_DELAY_MS("milliseconds", false),
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
index 43e5f5804b..c2cb9ce872 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.common.metrics;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicReference;
@@ -42,6 +43,11 @@ public class ServerMetrics extends
AbstractMetrics<ServerQueryPhase, ServerMeter
return SERVER_METRICS_INSTANCE.compareAndSet(null, serverMetrics);
}
+ @VisibleForTesting
+ public static void deregister() {
+ SERVER_METRICS_INSTANCE.set(null);
+ }
+
/**
* should always call after registration
*/
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 9bca2a6ca1..915b2eb1f2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -19,6 +19,8 @@
package org.apache.pinot.core.transport;
import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
@@ -37,6 +39,8 @@ import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.config.NettyConfig;
import org.apache.pinot.common.config.TlsConfig;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.core.util.OsCheck;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -114,8 +118,21 @@ public class QueryServer {
public void start() {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
+
+ PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
+ PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+ ServerMetrics metrics = ServerMetrics.get();
+
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
+
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
+ metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
+ metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_HEAP,
metric::numHeapArenas);
+
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
metric::smallCacheSize);
+
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
metric::normalCacheSize);
+
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_THREADLOCALCACHE,
metric::numThreadLocalCaches);
+ metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
_channel = serverBootstrap.group(_bossGroup,
_workerGroup).channel(_channelClass)
.option(ChannelOption.SO_BACKLOG,
128).childOption(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.ALLOCATOR, bufAllocator)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 82bbf64333..c9fe068ed4 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -19,6 +19,8 @@
package org.apache.pinot.core.transport;
import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.PooledByteBufAllocatorMetric;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
@@ -151,7 +153,19 @@ public class ServerChannels {
ServerChannel(ServerRoutingInstance serverRoutingInstance) {
_serverRoutingInstance = serverRoutingInstance;
+ PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
+ PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP,
metric::numHeapArenas);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
metric::smallCacheSize);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
metric::normalCacheSize);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE,
metric::numThreadLocalCaches);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
+
_bootstrap = new
Bootstrap().remoteAddress(serverRoutingInstance.getHostname(),
serverRoutingInstance.getPort())
+ .option(ChannelOption.ALLOCATOR, bufAllocator)
.group(_eventLoopGroup).channel(_channelClass).option(ChannelOption.SO_KEEPALIVE,
true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 5e1fa04176..2a30eefa2f 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.apache.pinot.util.TestUtils;
import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -77,9 +78,16 @@ public class QueryRoutingTest {
_requestCount = 0;
}
+ @AfterMethod
+ void deregisterServerMetrics() {
+ ServerMetrics.deregister();
+ }
+
private QueryServer getQueryServer(int responseDelayMs, byte[]
responseBytes) {
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
InstanceRequestHandler handler = new InstanceRequestHandler("server01",
new PinotConfiguration(),
- mockQueryScheduler(responseDelayMs, responseBytes),
mock(ServerMetrics.class), mock(AccessControl.class));
+ mockQueryScheduler(responseDelayMs, responseBytes), serverMetrics,
mock(AccessControl.class));
+ ServerMetrics.register(serverMetrics);
return new QueryServer(TEST_PORT, null, handler);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]