This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 545210b8e0e Share a single pooled ByteBuf allocator across Netty query
transports instead of one per server channel (#18905)
545210b8e0e is described below
commit 545210b8e0e660bb2619dfe951838ec325771680
Author: Yash Mayya <[email protected]>
AuthorDate: Thu Jul 2 09:12:37 2026 -0700
Share a single pooled ByteBuf allocator across Netty query transports
instead of one per server channel (#18905)
---
.../PooledByteBufAllocatorWithLimits.java | 37 ++++++++++++++++++++--
.../apache/pinot/core/transport/QueryServer.java | 14 +++++---
.../pinot/core/transport/ServerChannels.java | 28 ++++++++--------
.../pinot/core/transport/QueryServerTest.java | 11 +++++++
.../pinot/core/transport/ServerChannelsTest.java | 31 ++++++++++++++++++
5 files changed, 98 insertions(+), 23 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
index 54ab85dbeb8..14254d6932e 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/PooledByteBufAllocatorWithLimits.java
@@ -30,17 +30,42 @@ import org.slf4j.LoggerFactory;
/**
- * Utility class for setting limits in the PooledByteBufAllocator.
+ * Utility class that creates a {@link PooledByteBufAllocator} with a reduced
number of direct arenas to limit the
+ * direct memory retained by the pool, and owns the process-wide shared
instance of it. Thread-safe.
*/
public class PooledByteBufAllocatorWithLimits {
private static final Logger LOGGER =
LoggerFactory.getLogger(PooledByteBufAllocatorWithLimits.class);
+ private static volatile PooledByteBufAllocator
_sharedBufferAllocatorWithLimits;
private PooledByteBufAllocatorWithLimits() {
}
+ /**
+ * Returns the shared allocator, creating it on first use. All unshaded
Netty query transports within the process
+ * (all broker side {@link ServerChannels} and the server side {@link
QueryServer}) must share this single
+ * allocator: pooled arenas retain chunk memory after the buffers allocated
from them are released, and free space
+ * in one allocator's pool can never serve another allocator's allocations,
so per-connection allocators can retain
+ * many times the intended amount of direct memory and exhaust it. Note that
the reduced arena count limits the
+ * worst case retention but is not a hard cap on direct memory usage. The
gRPC based transports use shaded Netty
+ * classes and maintain their own allocators.
+ */
+ public static PooledByteBufAllocator getSharedBufferAllocatorWithLimits() {
+ PooledByteBufAllocator sharedAllocator = _sharedBufferAllocatorWithLimits;
+ if (sharedAllocator == null) {
+ synchronized (PooledByteBufAllocatorWithLimits.class) {
+ sharedAllocator = _sharedBufferAllocatorWithLimits;
+ if (sharedAllocator == null) {
+ sharedAllocator =
getBufferAllocatorWithLimits(PooledByteBufAllocator.DEFAULT.metric());
+ _sharedBufferAllocatorWithLimits = sharedAllocator;
+ }
+ }
+ }
+ return sharedAllocator;
+ }
+
// Reduce the number of direct arenas when using netty channels on broker
and server side to limit the direct
// memory usage
- public static PooledByteBufAllocator
getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
+ private static PooledByteBufAllocator
getBufferAllocatorWithLimits(PooledByteBufAllocatorMetric metric) {
int defaultPageSize =
SystemPropertyUtil.getInt("io.netty.allocator.pageSize", 8192);
final int defaultMinNumArena = NettyRuntime.availableProcessors() * 2;
int defaultMaxOrder =
SystemPropertyUtil.getInt("io.netty.allocator.maxOrder", 9);
@@ -48,10 +73,16 @@ public class PooledByteBufAllocatorWithLimits {
long maxDirectMemory = PlatformDependent.maxDirectMemory();
long remainingDirectMemory = maxDirectMemory - getReservedMemory();
+ // Floor the default at 1: this allocator is created once and shared for
the lifetime of the process, so a
+ // depleted direct memory snapshot at creation time must not permanently
disable pooling. An explicit
+ // io.netty.allocator.numDirectArenas=0 still disables direct arenas.
int numDirectArenas = Math.max(0,
SystemPropertyUtil.getInt("io.netty.allocator.numDirectArenas",
- (int) Math.min(defaultMinNumArena, remainingDirectMemory /
defaultChunkSize / 5)));
+ (int) Math.max(1, Math.min(defaultMinNumArena, remainingDirectMemory /
defaultChunkSize / 5))));
boolean useCacheForAllThreads =
SystemPropertyUtil.getBoolean("io.netty.allocator.useCacheForAllThreads",
false);
+ LOGGER.info("Creating PooledByteBufAllocator with numDirectArenas: {},
numHeapArenas: {}, chunkSize: {}, "
+ + "remainingDirectMemory: {}", numDirectArenas,
metric.numHeapArenas(), defaultChunkSize,
+ remainingDirectMemory);
return new PooledByteBufAllocator(true, metric.numHeapArenas(),
numDirectArenas, defaultPageSize, defaultMaxOrder,
metric.smallCacheSize(), metric.normalCacheSize(),
useCacheForAllThreads);
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
index 2f2ca7e2b7d..25e1fbcf117 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryServer.java
@@ -37,6 +37,7 @@ import io.netty.channel.socket.ServerSocketChannel;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.internal.PlatformDependent;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.config.NettyConfig;
@@ -117,12 +118,10 @@ public class QueryServer {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
- PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
- PooledByteBufAllocatorMetric metric = bufAllocator.metric();
- ServerMetrics metrics = ServerMetrics.get();
PooledByteBufAllocator bufAllocatorWithLimits =
-
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
- metric = bufAllocatorWithLimits.metric();
+
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits();
+ PooledByteBufAllocatorMetric metric = bufAllocatorWithLimits.metric();
+ ServerMetrics metrics = ServerMetrics.get();
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
metrics.setOrUpdateGlobalGauge(ServerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
@@ -186,4 +185,9 @@ public class QueryServer {
int getConnectedChannelCount() {
return _allChannels.size();
}
+
+ @VisibleForTesting
+ Set<SocketChannel> getConnectedChannels() {
+ return _allChannels.keySet();
+ }
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
index 7f0b091c32c..4bc3fb39f8a 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/transport/ServerChannels.java
@@ -83,6 +83,7 @@ public class ServerChannels {
private final EventLoopGroup _eventLoopGroup;
private final Class<? extends SocketChannel> _channelClass;
private final ThreadAccountant _threadAccountant;
+ private final PooledByteBufAllocator _bufAllocatorWithLimits;
private final BrokerMetrics _brokerMetrics = BrokerMetrics.get();
private final ConcurrentHashMap<ServerRoutingInstance, ServerChannel>
_serverToChannelMap = new ConcurrentHashMap<>();
@@ -126,6 +127,17 @@ public class ServerChannels {
_queryRouter = queryRouter;
_tlsConfig = tlsConfig;
_threadAccountant = threadAccountant;
+
+ _bufAllocatorWithLimits =
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits();
+ PooledByteBufAllocatorMetric metric = _bufAllocatorWithLimits.metric();
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP,
metric::numHeapArenas);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
metric::smallCacheSize);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
metric::normalCacheSize);
+
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE,
metric::numThreadLocalCaches);
+ _brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
}
public void sendRequest(String rawTableName, AsyncQueryResponse
asyncQueryResponse,
@@ -165,22 +177,8 @@ public class ServerChannels {
ServerChannel(ServerRoutingInstance serverRoutingInstance) {
_serverRoutingInstance = serverRoutingInstance;
- PooledByteBufAllocator bufAllocator = PooledByteBufAllocator.DEFAULT;
- PooledByteBufAllocatorMetric metric = bufAllocator.metric();
- PooledByteBufAllocator bufAllocatorWithLimits =
-
PooledByteBufAllocatorWithLimits.getBufferAllocatorWithLimits(metric);
- metric = bufAllocatorWithLimits.metric();
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_DIRECT_MEMORY,
metric::usedDirectMemory);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_USED_HEAP_MEMORY,
metric::usedHeapMemory);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_DIRECT,
metric::numDirectArenas);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_ARENAS_HEAP,
metric::numHeapArenas);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_SMALL,
metric::smallCacheSize);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CACHE_SIZE_NORMAL,
metric::normalCacheSize);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_THREADLOCALCACHE,
metric::numThreadLocalCaches);
-
_brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.NETTY_POOLED_CHUNK_SIZE,
metric::chunkSize);
-
_bootstrap = new
Bootstrap().remoteAddress(serverRoutingInstance.getHostname(),
serverRoutingInstance.getPort())
- .option(ChannelOption.ALLOCATOR,
bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
+ .option(ChannelOption.ALLOCATOR,
_bufAllocatorWithLimits).group(_eventLoopGroup).channel(_channelClass)
.option(ChannelOption.SO_KEEPALIVE, true).handler(new
ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
index df72863cd0c..d128737ac10 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryServerTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.transport;
import io.netty.channel.ChannelHandler;
+import io.netty.channel.socket.SocketChannel;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.commons.io.IOUtils;
@@ -34,6 +35,7 @@ import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertSame;
import static org.testng.Assert.assertTrue;
@@ -58,6 +60,10 @@ public class QueryServerTest {
QueryServer server = new QueryServer(0, nettyConfig, tlsConfig,
channelHandler);
server.start();
+ // The server should use the shared process-wide bounded allocator
+ assertSame(server.getChannel().config().getAllocator(),
+ PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
+
final InetSocketAddress serverAddress = server.getChannel().localAddress();
assertTrue(connectionOk(serverAddress));
@@ -82,6 +88,11 @@ public class QueryServerTest {
TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() >
0, 5_000L,
"Channel was not registered in _allChannels");
+ // The accepted child channels (which allocate the request/response
buffers) must also use the shared allocator
+ SocketChannel connectedChannel =
server.getConnectedChannels().iterator().next();
+ assertSame(connectedChannel.config().getAllocator(),
+
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
+
socket.close();
TestUtils.waitForCondition(aVoid -> server.getConnectedChannelCount() ==
0, 5_000L,
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
index 8d5aabbfc15..0dd1bbff383 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/transport/ServerChannelsTest.java
@@ -19,8 +19,10 @@
package org.apache.pinot.core.transport;
import com.sun.net.httpserver.HttpServer;
+import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import org.apache.pinot.common.config.NettyConfig;
@@ -42,6 +44,8 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertSame;
public class ServerChannelsTest {
@@ -91,6 +95,33 @@ public class ServerChannelsTest {
}
}
+ @Test
+ public void testChannelsShareBufferAllocator() {
+ ServerChannels serverChannels =
+ new ServerChannels(mock(QueryRouter.class), null, null,
ThreadAccountantUtils.getNoOpAccountant());
+ ServerChannels otherServerChannels =
+ new ServerChannels(mock(QueryRouter.class), null, null,
ThreadAccountantUtils.getNoOpAccountant());
+ try {
+ ByteBufAllocator allocator = getBootstrapAllocator(
+ serverChannels.getOrCreateServerChannel(new
ServerRoutingInstance("localhost", 12345, TableType.OFFLINE)));
+ assertNotNull(allocator);
+ assertSame(allocator,
PooledByteBufAllocatorWithLimits.getSharedBufferAllocatorWithLimits());
+ // All channels created by a ServerChannels use the same allocator
+ assertSame(getBootstrapAllocator(serverChannels.getOrCreateServerChannel(
+ new ServerRoutingInstance("localhost", 12346, TableType.REALTIME))),
allocator);
+ // Channels created by another ServerChannels instance (e.g. the TLS
one) share it as well
+
assertSame(getBootstrapAllocator(otherServerChannels.getOrCreateServerChannel(
+ new ServerRoutingInstance("localhost", 12347, TableType.OFFLINE))),
allocator);
+ } finally {
+ serverChannels.shutDown();
+ otherServerChannels.shutDown();
+ }
+ }
+
+ private static ByteBufAllocator
getBootstrapAllocator(ServerChannels.ServerChannel serverChannel) {
+ return (ByteBufAllocator)
serverChannel._bootstrap.config().options().get(ChannelOption.ALLOCATOR);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void testWriteFailureClosesChannelAndFailsQuery() {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]