This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 54ee6360ca2 Disable idle mailboxes by default (#16691)
54ee6360ca2 is described below
commit 54ee6360ca29c929524d768abd7cfe21e3c63a21
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Tue Aug 26 13:41:42 2025 +0200
Disable idle mailboxes by default (#16691)
---
.../org/apache/pinot/common/config/GrpcConfig.java | 13 +++++-
.../test/resources/{log4j2.xml => log4j2-test.xml} | 13 ++++++
.../apache/pinot/query/mailbox/MailboxService.java | 14 ++++++-
.../query/mailbox/channel/ChannelManager.java | 46 ++++++++++++++++------
.../apache/pinot/spi/utils/CommonConstants.java | 16 ++++++++
5 files changed, 89 insertions(+), 13 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
index 16ba0323d3f..c3276444f64 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/GrpcConfig.java
@@ -23,7 +23,18 @@ import java.util.Map;
import org.apache.pinot.common.utils.tls.TlsUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
-
+/// Configs used for the gRPC **query** service.
+///
+/// Remember that in Pinot we use different gPRC services for different
purposes:
+/// - **query**: used by Pinot users for executing queries
+/// - **internal**: used by Pinot for internal communication between
servers/broker in MSE. This includes:
+/// - The ability to send query plans from broker to server, and the
mailbox service for sending data between
+/// servers/brokers
+/// - The broker -> server communication for the SSE when
`pinot.broker.request.handler.type` is set to grpc
+/// (see the GrpcBrokerRequestHandler)
+///
+/// This class only affects the **query** service. See ChannelManager,
MailboxService and GrpcMailboxServer to learn
+/// more about the Grpc config used for MSE.
public class GrpcConfig {
public static final String GRPC_TLS_PREFIX = "tls";
public static final String CONFIG_USE_PLAIN_TEXT = "usePlainText";
diff --git a/pinot-integration-tests/src/test/resources/log4j2.xml
b/pinot-integration-tests/src/test/resources/log4j2-test.xml
similarity index 73%
rename from pinot-integration-tests/src/test/resources/log4j2.xml
rename to pinot-integration-tests/src/test/resources/log4j2-test.xml
index 439331f9d7b..afed36b3132 100644
--- a/pinot-integration-tests/src/test/resources/log4j2.xml
+++ b/pinot-integration-tests/src/test/resources/log4j2-test.xml
@@ -23,12 +23,25 @@
<Appenders>
<Console name="console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
+ <Filters>
+ <BurstFilter level="ERROR" rate="5" maxBurst="10"/>
+ </Filters>
+ </Console>
+ <Console name="spammy" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} %p [%c{1}] [%t] %m%n"/>
+ <Filters>
+ <BurstFilter level="ERROR" rate="1" maxBurst="2"/>
+ </Filters>
</Console>
</Appenders>
+
<Loggers>
<Logger name="org.apache.pinot" level="warn" additivity="false">
<AppenderRef ref="console"/>
</Logger>
+ <Logger name="org.apache.pinot.core.accounting" level="warn"
additivity="false">
+ <AppenderRef ref="spammy"/>
+ </Logger>
<Root level="error">
<AppenderRef ref="console"/>
</Root>
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 8e572afe478..92dae478321 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -21,6 +21,7 @@ package org.apache.pinot.query.mailbox;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
+import java.time.Duration;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
@@ -89,7 +90,7 @@ public class MailboxService {
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
);
- _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize);
+ _channelManager = new ChannelManager(tlsConfig, maxInboundMessageSize,
getIdleTimeout(config));
_accessControlFactory = accessControlFactory;
boolean splitBlocks = config.getProperty(
CommonConstants.MultiStageQueryRunner.KEY_OF_ENABLE_DATA_BLOCK_PAYLOAD_SPLIT,
@@ -168,4 +169,15 @@ public class MailboxService {
public void releaseReceivingMailbox(ReceivingMailbox mailbox) {
_receivingMailboxCache.invalidate(mailbox.getId());
}
+
+ private static Duration getIdleTimeout(PinotConfiguration config) {
+ long channelIdleTimeoutSeconds = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_CHANNEL_IDLE_TIMEOUT_SECONDS,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS);
+ if (channelIdleTimeoutSeconds > 0) {
+ return Duration.ofSeconds(channelIdleTimeoutSeconds);
+ }
+ // Use a reasonable maximum idle timeout (1 year) to avoid overflow.
+ return Duration.ofDays(365);
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index 8d63b28aa96..f0ff135d9df 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -21,7 +21,9 @@ package org.apache.pinot.query.mailbox.channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
+import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.TlsConfig;
@@ -35,32 +37,54 @@ import
org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
* query/job/stages.
*/
public class ChannelManager {
+ /**
+ * Map from (hostname, port) to the ManagedChannel with all known channels
+ */
private final ConcurrentHashMap<Pair<String, Integer>, ManagedChannel>
_channelMap = new ConcurrentHashMap<>();
private final TlsConfig _tlsConfig;
+ /**
+ * The idle timeout for the channel, which cannot be disabled in gRPC.
+ *
+ * In general we want to prevent the channel from going idle, so that we
don't have to re-establish the connection
+ * (including TLS negotiation) before sending any message, which increases
the latency of the first query sent after a
+ * period of inactivity. In order to achieve that, we set the idle timeout
to a very large value by default.
+ */
+ private final Duration _idleTimeout;
private final int _maxInboundMessageSize;
- public ChannelManager(@Nullable TlsConfig tlsConfig, int
maxInboundMessageSize) {
+ public ChannelManager(@Nullable TlsConfig tlsConfig, int
maxInboundMessageSize, Duration idleTimeout) {
_tlsConfig = tlsConfig;
_maxInboundMessageSize = maxInboundMessageSize;
+ _idleTimeout = idleTimeout;
}
public ManagedChannel getChannel(String hostname, int port) {
// TODO: Revisit parameters
if (_tlsConfig != null) {
return _channelMap.computeIfAbsent(Pair.of(hostname, port),
- (k) -> NettyChannelBuilder
- .forAddress(k.getLeft(), k.getRight())
- .maxInboundMessageSize(_maxInboundMessageSize)
- .sslContext(ServerGrpcQueryClient.buildSslContext(_tlsConfig))
- .build()
+ (k) -> {
+ NettyChannelBuilder channelBuilder = NettyChannelBuilder
+ .forAddress(k.getLeft(), k.getRight())
+ .maxInboundMessageSize(
+ _maxInboundMessageSize)
+ .sslContext(ServerGrpcQueryClient.buildSslContext(_tlsConfig));
+ return decorate(channelBuilder).build();
+ }
);
} else {
return _channelMap.computeIfAbsent(Pair.of(hostname, port),
- (k) -> ManagedChannelBuilder
- .forAddress(k.getLeft(), k.getRight())
- .maxInboundMessageSize(_maxInboundMessageSize)
- .usePlaintext()
- .build());
+ (k) -> {
+ ManagedChannelBuilder<?> channelBuilder = ManagedChannelBuilder
+ .forAddress(k.getLeft(), k.getRight())
+ .maxInboundMessageSize(
+ _maxInboundMessageSize)
+ .usePlaintext();
+ return decorate(channelBuilder).build();
+ });
}
}
+
+ private ManagedChannelBuilder<?> decorate(ManagedChannelBuilder<?> builder) {
+ return builder.idleTimeout(_idleTimeout.getSeconds(), TimeUnit.SECONDS);
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1b8fbc16a96..4d2a61c965a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1903,6 +1903,22 @@ public class CommonConstants {
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
+
+ /**
+ * Configuration for channel idle timeout in seconds.
+ *
+ * gRPC channels go idle after a period of inactivity. When a channel is
idle, its resources are released. The next
+ * query using the channel will need to re-establish the connection. This
includes the TLS negotiation and therefore
+ * can increase the latency of the query by some milliseconds.
+ *
+ * In normal Pinot clusters that are continuously serving queries,
channels should never go idle.
+ * But it could affect clusters that are not continuously serving queries.
+ * This is why by default the channel idle timeout is set to -1, which
means that the channel idle timeout is
+ * disabled.
+ */
+ public static final String KEY_OF_CHANNEL_IDLE_TIMEOUT_SECONDS =
"pinot.query.runner.channel.idle.timeout.seconds";
+ public static final long DEFAULT_CHANNEL_IDLE_TIMEOUT_SECONDS = -1;
+
/**
* Enable splitting of data block payload during mailbox transfer.
*/
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]