This is an automated email from the ASF dual-hosted git repository.
gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new f00dc04af11 [multistage] Sender-side gRPC back-pressure for mailbox +
MAILBOX_CLIENT_USED_* gauges (#18519)
f00dc04af11 is described below
commit f00dc04af111f4ba4e5ef7c92e93c0eef1dc3082
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed May 27 18:13:32 2026 +0200
[multistage] Sender-side gRPC back-pressure for mailbox +
MAILBOX_CLIENT_USED_* gauges (#18519)
---
.../apache/pinot/common/metrics/BrokerGauge.java | 4 +
.../apache/pinot/common/metrics/ServerGauge.java | 4 +
.../pinot/perf/BenchmarkGrpcMailboxSend.java | 263 ++++++++++++++
.../pinot/query/mailbox/GrpcSendingMailbox.java | 376 +++++++++++++++++++--
.../apache/pinot/query/mailbox/MailboxService.java | 101 +++++-
.../query/mailbox/channel/ChannelManager.java | 38 ++-
.../query/mailbox/channel/GrpcMailboxServer.java | 68 +++-
.../mailbox/channel/MailboxContentObserver.java | 19 +-
.../mailbox/GrpcSenderBackpressureOffPathTest.java | 187 ++++++++++
.../query/mailbox/GrpcSenderBackpressureTest.java | 251 ++++++++++++++
.../GrpcSenderBackpressureTightGateTest.java | 220 ++++++++++++
.../query/mailbox/GrpcSendingMailboxTest.java | 230 ++++++++++++-
.../query/mailbox/channel/ChannelManagerTest.java | 35 +-
.../channel/GrpcMailboxServerValidationTest.java | 71 ++++
.../channel/MailboxContentObserverTest.java | 60 +++-
.../apache/pinot/spi/utils/CommonConstants.java | 133 ++++++++
16 files changed, 2016 insertions(+), 44 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 3d5b2ae9366..b730aa0e9d5 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -106,6 +106,10 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
+ // MailboxService gRPC client (outbound to peer mailboxes) memory metrics
+ MAILBOX_CLIENT_USED_DIRECT_MEMORY("bytes", true),
+ MAILBOX_CLIENT_USED_HEAP_MEMORY("bytes", true),
+
/// Exports the max amount of direct memory that can be allocated by the
shaded Netty code used by gRPC
/// It is basically an adaptor for
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.maxDirectMemory()
///
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index c34990d0a23..c2dba897aec 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -126,6 +126,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
+ // MailboxService gRPC client (outbound to peer mailboxes) memory metrics
+ MAILBOX_CLIENT_USED_DIRECT_MEMORY("bytes", true),
+ MAILBOX_CLIENT_USED_HEAP_MEMORY("bytes", true),
+
/// Exports the max amount of direct memory that can be allocated by Netty
/// It is basically an adaptor for
io.netty.util.internal.PlatformDependent.maxDirectMemory()
///
diff --git
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java
new file mode 100644
index 00000000000..ff4bff51577
--- /dev/null
+++
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/// Benchmark for the gRPC sender → receiver MSE mailbox path.
+///
+/// One @Benchmark invocation sends a fixed total payload (`TOTAL_BYTES = 128
MiB`) split into
+/// pre-computed `MseBlock.Data` blocks of size `_blockSizeBytes`, and waits
until every block has
+/// been polled out of the receiver mailbox. Measured time is therefore
end-to-end "ship a request
+/// of this size and process it on the other side" — closer to the unit Pinot
actually cares about
+/// than throughput of one-block-at-a-time hot-loop send.
+///
+/// Axes:
+///
+/// * `_blockSizeBytes` — three representative block sizes:
+/// - `8 KiB` → 16384 blocks per invocation; sender is dominated by
per-block overhead.
+/// - `8 MiB` → 16 blocks; one block fits in a single gRPC chunk
+/// (`maxInboundMessageSize / 2 ≈ 8 MiB`).
+/// - `32 MiB` → 4 blocks; each block is split by `toByteStrings` into
~4 gRPC chunks.
+/// * `_backpressureEnabled` — toggles the `GrpcSendingMailbox`
`isReady()`-gate. `false` reverts
+/// to the pre-fix unconditional `onNext` (kill-switch).
+/// * `_flowControlWindowBytes` — HTTP/2 per-stream inbound window the
receiver advertises
+/// (`pinot.query.runner.grpc.flow.control.window.bytes`). Sweeps `{64 KiB,
1 MiB, 64 MiB}`,
+/// spanning the historical default, BDP-estimated LAN value, and the new
MB-scale default.
+///
+/// The drainer runs as a separate thread (modelling the cross-operator
boundary in a real query),
+/// blocks on a `Semaphore` released by the receiver's `Reader` callback, and
counts down a
+/// per-invocation `CountDownLatch` for each polled block. The @Benchmark
thread sends every block
+/// then awaits the latch.
+///
+/// Run with `org.openjdk.jmh.Main
org.apache.pinot.perf.BenchmarkGrpcMailboxSend`.
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(value = 1, jvmArgsAppend = {
+ "-Xms2g", "-Xmx4g",
+ "-XX:MaxDirectMemorySize=4g"
+})
+@Warmup(iterations = 2, time = 5)
+@Measurement(iterations = 3, time = 10)
+@State(Scope.Benchmark)
+public class BenchmarkGrpcMailboxSend {
+
+ /// Total bytes shipped per @Benchmark invocation. Sized so that even at 32
MiB-per-block we
+ /// get a handful of blocks to bound the per-invocation latency.
+ private static final long TOTAL_BYTES = 128L * 1024 * 1024;
+
+ private static final DataSchema SCHEMA = new DataSchema(
+ new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+
+ /// Approximate bytes per `MseBlock.Data` payload. Each block carries a
single String column,
+ /// one row, where the String is roughly `_blockSizeBytes` characters (ASCII
→ 1 byte per char
+ /// when serialised on the wire, ~2 bytes on heap). The number of blocks per
invocation is
+ /// `ceil(TOTAL_BYTES / _blockSizeBytes)`.
+ @Param({"8192", "8388608", "33554432"})
+ public int _blockSizeBytes;
+
+ /// `true` exercises the sender-side `isReady()`-gate; `false` restores the
pre-fix
+ /// unconditional `onNext` (kill-switch). Wired via
+ /// `pinot.query.runner.grpc.sender.backpressure.enabled`.
+ @Param({"true", "false"})
+ public boolean _backpressureEnabled;
+
+ /// HTTP/2 per-stream flow-control window the receiver advertises, in bytes
+ /// (`pinot.query.runner.grpc.flow.control.window.bytes`):
+ /// * `65535` (~64 KiB) — historical gRPC default; gate engages
frequently.
+ /// * `1048576` (~1 MiB) — typical BDP-estimated LAN value.
+ /// * `67108864` (64 MiB) — Pinot's new default; gate rarely engages under
typical loads.
+ @Param({"65535", "1048576", "67108864"})
+ public int _flowControlWindowBytes;
+
+ private MailboxService _senderService;
+ private MailboxService _receiverService;
+ private SendingMailbox _sender;
+ private ReceivingMailbox _receiver;
+ private List<RowHeapDataBlock> _blocks;
+
+ // Drainer signalling.
+ private final Semaphore _readSignal = new Semaphore(0);
+ private final AtomicBoolean _stop = new AtomicBoolean();
+ private Thread _drainer;
+ private volatile CountDownLatch _drainedLatch;
+
+ @Setup
+ public void setup()
+ throws IOException {
+ // The GrpcMailboxServer fail-fast validation requires flowControlWindow
>= maxInboundMessageSize. The narrow window
+ // axis values (65535, 1 MiB) are smaller than the 16 MiB default for
max-inbound-message-size, so we also clamp the
+ // max-msg-size down to the window. Math.min keeps the 64 MiB-window cell
using the production default 16 MiB
+ // max-msg-size; the two narrow-window cells pin max-msg-size down to the
window so the validation passes.
+ // Manual inbound flow control defaults to off; this bench specifically
measures the feature-enabled
+ // throughput across window sizes, so we explicitly opt in and set the
credit to 128 (the original
+ // "as designed" value before the production default was switched to 1).
+ PinotConfiguration cfg = new PinotConfiguration(Map.of(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
_backpressureEnabled,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
true,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT, 128,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES,
_flowControlWindowBytes,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+ Math.min(_flowControlWindowBytes,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)));
+ _senderService = new MailboxService("localhost", availablePort(),
InstanceType.SERVER, cfg);
+ _senderService.start();
+ _receiverService = new MailboxService("localhost", availablePort(),
InstanceType.SERVER, cfg);
+ _receiverService.start();
+
+ // Inlined MailboxIdUtils.toMailboxId(1L, 1, 0, 0, 0) format
+ // (requestId_senderStage_senderWorker_recvStage_recvWorker).
+ String mailboxId = "1_1_0_0_0";
+ long deadlineMs = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+ _sender = _senderService.getSendingMailbox("localhost",
_receiverService.getPort(),
+ mailboxId, deadlineMs, new
StatMap<>(MailboxSendOperator.StatKey.class));
+ _receiver = _receiverService.getReceivingMailbox(mailboxId);
+ // Reader callback fires once per `offer` — wake the drainer.
+ _receiver.registeredReader(_readSignal::release);
+
+ // Pre-compute the block list. All blocks share the same payload String
(immutable, so
+ // sharing is safe and keeps heap usage bounded). The serialiser still
emits independent
+ // byte buffers per send.
+ int numBlocks = (int) ((TOTAL_BYTES + _blockSizeBytes - 1) /
_blockSizeBytes);
+ char[] chars = new char[_blockSizeBytes];
+ Arrays.fill(chars, 'x');
+ String payload = new String(chars);
+ _blocks = new ArrayList<>(numBlocks);
+ for (int i = 0; i < numBlocks; i++) {
+ _blocks.add(new RowHeapDataBlock(
+ Collections.singletonList(new Object[]{payload}), SCHEMA));
+ }
+
+ // Drainer: block on the reader signal, then drain everything available
with non-blocking
+ // poll. The semaphore may accumulate spurious permits (we poll more
aggressively than the
+ // reader fires) but those just cause a no-op wakeup later, which is
harmless.
+ _drainer = new Thread(() -> {
+ while (!_stop.get()) {
+ try {
+ _readSignal.acquire();
+ } catch (InterruptedException e) {
+ return;
+ }
+ while (_receiver.poll() != null) {
+ CountDownLatch latch = _drainedLatch;
+ if (latch != null) {
+ latch.countDown();
+ }
+ }
+ }
+ }, "bench-grpc-mailbox-drainer");
+ _drainer.setDaemon(true);
+ _drainer.start();
+ }
+
+ private static int availablePort()
+ throws IOException {
+ try (ServerSocket socket = new ServerSocket(0)) {
+ return socket.getLocalPort();
+ }
+ }
+
+ @TearDown
+ public void teardown()
+ throws InterruptedException {
+ try {
+ _sender.cancel(new RuntimeException("bench done"));
+ } catch (Exception ignored) {
+ // best-effort
+ }
+ _stop.set(true);
+ _readSignal.release();
+ _drainer.join(TimeUnit.SECONDS.toMillis(10));
+ _senderService.shutdown();
+ _receiverService.shutdown();
+ }
+
+ /// `GrpcSendingMailbox.send` calls
`QueryThreadContext.checkTerminationAndSampleUsage`, which
+ /// requires a context to be open on the calling thread. Opening it as a
thread-scoped @State
+ /// ensures the JMH worker thread has one open across the iteration.
+ @State(Scope.Thread)
+ public static class ThreadCtx {
+ private QueryThreadContext _ctx;
+
+ @Setup
+ public void setup() {
+ _ctx = QueryThreadContext.openForMseTest();
+ }
+
+ @TearDown
+ public void tearDown() {
+ _ctx.close();
+ }
+ }
+
+ @Benchmark
+ public void sendRequest(ThreadCtx ignored)
+ throws InterruptedException {
+ CountDownLatch latch = new CountDownLatch(_blocks.size());
+ _drainedLatch = latch;
+ for (RowHeapDataBlock block : _blocks) {
+ _sender.send(block);
+ }
+ latch.await();
+ }
+
+ public static void main(String[] args)
+ throws Exception {
+ new Runner(new OptionsBuilder()
+ .include(BenchmarkGrpcMailboxSend.class.getSimpleName())
+ .build()).run();
+ }
+}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 75db17a105e..6079f72b9e7 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Maps;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import io.grpc.Metadata;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.MetadataUtils;
-import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -32,10 +33,13 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.MetadataBlock;
import org.apache.pinot.common.datatable.StatMap;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
+import org.apache.pinot.common.proto.Mailbox.MailboxStatus;
import org.apache.pinot.common.proto.PinotMailboxGrpc;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.mailbox.channel.ChannelUtils;
@@ -56,6 +60,10 @@ import org.slf4j.LoggerFactory;
/**
* gRPC implementation of the {@link SendingMailbox}. The gRPC stream is
created on the first call to {@link #send}.
+ *
+ * <p>Thread-safety: {@code _readyLock} serializes every call to {@code
_contentObserver.onNext} / {@code onCompleted}.
+ * The underlying {@link ClientCallStreamObserver} is not thread-safe, and the
sender, cancel, and EOS paths can run
+ * concurrently. Acquire {@code _readyLock} around any new outbound call site
you add.
*/
public class GrpcSendingMailbox implements SendingMailbox {
private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcSendingMailbox.class);
@@ -70,19 +78,36 @@ public class GrpcSendingMailbox implements SendingMailbox {
private final StatMap<MailboxSendOperator.StatKey> _statMap;
private final MailboxStatusObserver _statusObserver = new
MailboxStatusObserver();
private final int _maxByteStringSize;
+ /// Kill-switch for the sender-side `isReady()` gate. When `false`,
`awaitReady` short-circuits like the bypass
+ /// path and the sender pushes unconditionally — restoring the pre-1.6
behaviour. Plumbed from
+ /// `pinot.query.runner.grpc.sender.backpressure.enabled` so it can be
flipped without code changes if the gate
+ /// causes a regression in production, and also used by
`BenchmarkGrpcMailboxSend` for A/B measurements.
+ private final boolean _backpressureEnabled;
/// Indicates whether the sending side has attempted to close the mailbox
(either via complete() or cancel()).
private volatile boolean _senderSideClosed;
- private StreamObserver<MailboxContent> _contentObserver;
+ /// Guards [#_readyCond]. [#_contentObserver] is normally written once by
the sending thread on its first call to
+ /// [#sendInternal]. The field is declared `volatile` because [#cancel] and
[#close] can read it from a different
+ /// thread (e.g. an external cancel from an OpChain on-failure callback or a
watchdog in tests), and we need a
+ /// happens-before edge for the sender's lazy initialization.
+ private final ReentrantLock _readyLock = new ReentrantLock();
+ /// Signalled whenever any of the predicates `awaitReady()` waits on may
have changed: the gRPC outbound becomes
+ /// ready, the receiver acknowledges a chunk, the receiver-side stream
closes (success or error), or the sender
+ /// itself is cancelled. Multiple producers fire the signal; the waiter
always re-checks the predicates after
+ /// waking up.
+ private final Condition _readyCond = _readyLock.newCondition();
+
+ private volatile ClientCallStreamObserver<MailboxContent> _contentObserver;
public GrpcSendingMailbox(String id, ChannelManager channelManager, String
hostname, int port, long deadlineMs,
- StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize)
{
+ StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize,
boolean backpressureEnabled) {
_id = id;
_channelManager = channelManager;
_hostname = hostname;
_port = port;
_deadlineMs = deadlineMs;
_statMap = statMap;
+ _backpressureEnabled = backpressureEnabled;
// TODO: tune the maxByteStringSize based on experiments. We know the
maxInboundMessageSize on the receiver side,
// but we want to leave some room for extra stuff for other fields like
metadata, mailbox id, etc, whose size
// we don't know at the time of writing into the stream as it is
serialized by protobuf.
@@ -102,10 +127,52 @@ public class GrpcSendingMailbox implements SendingMailbox
{
@Override
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
- if (sendInternal(block, serializedStats)) {
- LOGGER.debug("Completing mailbox: {}", _id);
- _contentObserver.onCompleted();
+ // EOS blocks (success or error) carry control-plane info — including the
original error code on the error path —
+ // and must always reach the receiver, so they bypass the back-pressure
gate. Bypassing also disables the
+ // cooperative termination poll inside [#awaitReady]; without it, a
terminate signal raised while the sender is
+ // mid-way through pushing an error EOS would unwind [#sendInternal] with
a TerminationException, leave
+ // [#_senderSideClosed] false, and let [#cancel] run and overwrite the
original error code with
+ // QUERY_CANCELLATION on the receiver side.
+ //
+ // Race against a concurrent [#cancel]: both paths perform an early
`isTerminated()` check before taking
+ // [#_readyLock], so two threads can both pass that check and enter their
respective lock sections. The cancel
+ // path acquires the lock first, pushes its own error EOS payload (under
`bypassReady=true`) and calls
+ // `_contentObserver.onCompleted()` — the gRPC client stream is now
half-closed. When this thread then enters
+ // its own [#sendInternal] -> [#processAndSend] -> [#sendContent] under
the lock, the in-lock
+ // `isTerminated()` re-check is skipped because `bypassReady=true`, so
`_contentObserver.onNext()` is invoked on
+ // an already-half-closed observer and raises `IllegalStateException("call
already half-closed")`. The
+ // subsequent `_contentObserver.onCompleted()` would raise the same error.
We wrap the whole EOS push +
+ // half-close in a `try/catch(IllegalStateException)` and treat it as a
benign duplicate close — the cancel
+ // path has already delivered an EOS, so the receiver will still get a
terminal signal. Matches the
+ // defensive `catch(Exception)` in [#cancel] for the symmetric edge.
+ boolean sent;
+ try {
+ sent = sendInternal(block, serializedStats, /* bypassReady */ true);
+ } catch (IllegalStateException e) {
+ // Concurrent cancel won the race and half-closed the stream while we
were pushing the EOS payload.
+ LOGGER.debug("EOS send raced with cancel on already half-closed stream
for mailbox: {}", _id, e);
+ // Make sure isTerminated() observes us as closed so any downstream
caller doesn't keep trying.
_senderSideClosed = true;
+ return;
+ }
+ if (sent) {
+ LOGGER.debug("Completing mailbox: {}", _id);
+ // _readyLock serializes outbound observer calls. Holding it across the
flag write + onCompleted() makes the
+ // half-close atomic with respect to any racing sendContent() /
cancel(), so two threads cannot call onNext()
+ // / onCompleted() on the same non-thread-safe ClientCallStreamObserver.
Setting _senderSideClosed before
+ // onCompleted() also means a sendContent() that observes isTerminated()
under the lock will skip its onNext().
+ _readyLock.lock();
+ try {
+ _senderSideClosed = true;
+ _contentObserver.onCompleted();
+ } catch (IllegalStateException e) {
+ // The cancel path may have raced through its own onCompleted()
between our sendContent() returning and our
+ // acquiring _readyLock here (cancel takes its own lock after we
release ours in sendContent's finally).
+ // Treat the duplicate half-close as benign — same shape as the cancel
path's catch(Exception).
+ LOGGER.debug("EOS half-close raced with cancel on already half-closed
stream for mailbox: {}", _id, e);
+ } finally {
+ _readyLock.unlock();
+ }
} else {
LOGGER.warn("Trying to send EOS to the already terminated mailbox: {}",
_id);
}
@@ -113,6 +180,10 @@ public class GrpcSendingMailbox implements SendingMailbox {
/// Tries to send the block to the receiver. Returns true if the block is
sent, false otherwise.
private boolean sendInternal(MseBlock block, List<DataBuffer>
serializedStats) {
+ return sendInternal(block, serializedStats, /* bypassReady */ false);
+ }
+
+ private boolean sendInternal(MseBlock block, List<DataBuffer>
serializedStats, boolean bypassReady) {
if (isTerminated() || (isEarlyTerminated() && block.isData())) {
LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox.
Skipping sending message {} to: {}",
block, _id);
@@ -121,11 +192,9 @@ public class GrpcSendingMailbox implements SendingMailbox {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("==[GRPC SEND]== sending message " + block + " to: " + _id);
}
- if (_contentObserver == null) {
- _contentObserver = getContentObserver();
- }
+ ensureContentObserverInitialized();
try {
- processAndSend(block, serializedStats);
+ processAndSend(block, serializedStats, bypassReady);
} catch (IOException e) {
throw new QueryException(QueryErrorCode.INTERNAL, "Failed to split and
send mailbox", e);
}
@@ -137,11 +206,19 @@ public class GrpcSendingMailbox implements SendingMailbox
{
private void processAndSend(MseBlock block, List<DataBuffer> serializedStats)
throws IOException {
+ processAndSend(block, serializedStats, false);
+ }
+
+ /// Same as [#processAndSend(MseBlock, List)] but with a flag to bypass the
[#awaitReady] gate. Used by the
+ /// cancel / close paths to push the error EOS through without waiting on
back-pressure relief — without this,
+ /// a cancel issued while the receiver is congested would itself block until
the receiver drains.
+ private void processAndSend(MseBlock block, List<DataBuffer>
serializedStats, boolean bypassReady)
+ throws IOException {
_statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
long start = System.currentTimeMillis();
try {
DataBlock dataBlock = MseBlockSerializer.toDataBlock(block,
serializedStats);
- int sizeInBytes = processAndSend(dataBlock);
+ int sizeInBytes = processAndSend(dataBlock, bypassReady);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes);
}
@@ -157,6 +234,11 @@ public class GrpcSendingMailbox implements SendingMailbox {
*/
protected int processAndSend(DataBlock dataBlock)
throws IOException {
+ return processAndSend(dataBlock, false);
+ }
+
+ protected int processAndSend(DataBlock dataBlock, boolean bypassReady)
+ throws IOException {
List<ByteString> byteStrings = toByteStrings(dataBlock,
_maxByteStringSize);
int sizeInBytes = 0;
for (ByteString byteString : byteStrings) {
@@ -166,11 +248,29 @@ public class GrpcSendingMailbox implements SendingMailbox
{
while (byteStringIt.hasNext()) {
ByteString byteString = byteStringIt.next();
boolean waitForMore = byteStringIt.hasNext();
- sendContent(byteString, waitForMore);
+ sendContent(byteString, waitForMore, bypassReady);
}
return sizeInBytes;
}
+ /// Cancels this mailbox by pushing an error EOS to the receiver and closing
the gRPC stream.
+ ///
+ /// ## Known limitation: in-band EOS on a stuck receiver
+ ///
+ /// The error EOS is pushed **in-band** on the same gRPC stream as data, via
+ /// [#processAndSend] with `bypassReady=true`. If the receiver's application
queue
+ /// ([org.apache.pinot.query.mailbox.ReceivingMailbox], default capacity 5)
is full and its dispatch
+ /// thread is parked in `_notFull.await`, the EOS sits behind every other
inbound message that already
+ /// made it past gRPC's inbound flow-control window. Worst-case
cancel-propagation latency is bounded by
+ /// the receiver's inbound credit
+ ///
([org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT])
+ /// and
[org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES].
+ ///
+ /// This is **pre-existing** behaviour — the hang surface existed even with
gRPC's auto-inbound default of
+ /// 1 in-flight message, and survives the rollback knob
+ ///
[org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED]
+ /// = `false`. The proper fix is an out-of-band cancel channel; see
+ /// https://github.com/apache/pinot/issues/18541.
@Override
public void cancel(Throwable t) {
if (isTerminated()) {
@@ -178,20 +278,32 @@ public class GrpcSendingMailbox implements SendingMailbox
{
return;
}
_senderSideClosed = true;
+ // Wake any sender thread blocked in awaitReady() so it observes the
termination and exits without racing this
+ // cancel path on the same observer.
+ wakeWaiters();
LOGGER.debug("Cancelling mailbox: {}", _id);
- if (_contentObserver == null) {
- _contentObserver = getContentObserver();
- }
+ // Sender thread may never have created the stream (e.g. cancel arrived
before the first send). Open one now so
+ // the receiver gets an explicit cancel-error EOS instead of waiting for
its own deadline — the receiving
+ // mailbox is registered per the dispatch plan and is blocked on this
stream. Lazy init is load-bearing: an eager
+ // init in the constructor would change cancel-before-first-send semantics
(we would always open a stream even
+ // when send() is never called). Double-checked-lock through _readyLock so
a concurrent first-send + cancel from
+ // two threads do not both call getContentObserver() and leak an orphan
gRPC stream.
+ ensureContentObserverInitialized();
+ // Acquire _readyLock so the error EOS + onCompleted() is one atomic
outbound. ReentrantLock: the inner
+ // sendContent()'s acquisition nests safely.
+ _readyLock.lock();
try {
String msg = t != null ? t.getMessage() : "Unknown";
// NOTE: DO NOT use onError() because it will terminate the stream, and
receiver might not get the callback
MseBlock errorBlock = ErrorMseBlock.fromError(
QueryErrorCode.QUERY_CANCELLATION, "Cancelled by sender with
exception: " + msg);
- processAndSend(errorBlock, List.of());
+ processAndSend(errorBlock, List.of(), /* bypassReady */ true);
_contentObserver.onCompleted();
} catch (Exception e) {
// Exception can be thrown if the stream is already closed, so we simply
ignore it
LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
+ } finally {
+ _readyLock.unlock();
}
}
@@ -208,23 +320,204 @@ public class GrpcSendingMailbox implements
SendingMailbox {
return _senderSideClosed || _statusObserver.isFinished();
}
- private StreamObserver<MailboxContent> getContentObserver() {
+ /// Lazily initializes [#_contentObserver] under [#_readyLock] using the
standard double-checked-locking idiom.
+ ///
+ /// Lazy init is load-bearing for cancel-before-first-send semantics: we do
not want to open a gRPC stream in the
+ /// constructor because a mailbox that is cancelled before any [#send] would
then leak an unused stream. Eager init
+ /// is therefore not an option.
+ ///
+ /// The unsynchronized `if (_contentObserver == null) { _contentObserver =
getContentObserver(); }` pattern this
+ /// replaces was unsafe: two threads (e.g. the sender thread doing its first
[#sendInternal] and an external
+ /// canceller running [#cancel]) could both observe `null`, both call
[#getContentObserver], and each open a
+ /// separate gRPC stream for the same mailbox id. The loser would never see
`onCompleted` and the orphan stream
+ /// would only be reclaimed by the gRPC deadline — and the cancel EOS could
land on a different stream than the
+ /// data, silently losing the cancellation signal.
+ ///
+ /// `_contentObserver` is `volatile` so the fast-path check provides the
happens-before edge for the eventual
+ /// observer reads in [#sendContent] / [#cancel].
+ private void ensureContentObserverInitialized() {
+ if (_contentObserver != null) {
+ return;
+ }
+ _readyLock.lock();
+ try {
+ if (_contentObserver == null) {
+ _contentObserver = getContentObserver();
+ }
+ } finally {
+ _readyLock.unlock();
+ }
+ }
+
+ // Package-private to allow regression tests for the lazy-init race to count
how many times a content observer is
+ // opened for a single mailbox.
+ ClientCallStreamObserver<MailboxContent> getContentObserver() {
Metadata metadata = new Metadata();
metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
- return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname,
_port))
+ // We wrap `_statusObserver` in a ClientResponseObserver so we can
register the on-ready handler through
+ // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after
open() returns. Wrapping (rather than
+ // making MailboxStatusObserver itself a ClientResponseObserver) keeps the
back-pressure plumbing local to this
+ // class. The wrapper delegates the data callbacks unchanged, and signals
our `_readyCond` on stream close so a
+ // blocked sender wakes up to observe `_statusObserver.isFinished()`
becoming true.
+ ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+ new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<MailboxContent>
requestStream) {
+ // Fires on a gRPC channel/Netty thread whenever isReady()
transitions false -> true. Just signal; the
+ // sender re-checks the predicate after waking.
+
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+ }
+
+ @Override
+ public void onNext(MailboxStatus value) {
+ _statusObserver.onNext(value);
+ // Only wake on receiver early-terminate. Transport-level
isReady() transitions reach a parked
+ // sender through setOnReadyHandler (registered in beforeStart
above); normal buffer-size ACKs
+ // do not change any predicate awaitReady() actually waits on, so
signalling them would force a
+ // spurious park/unpark cycle on every receiver ACK.
Early-terminate is the one status-only
+ // change (the stream stays open) that awaitReady() must observe
promptly, so we still signal
+ // here when its metadata is set.
+ if (Boolean.parseBoolean(
+
value.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
{
+ wakeWaiters();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ try {
+ _statusObserver.onError(t);
+ } finally {
+ wakeWaiters();
+ }
+ }
+
+ @Override
+ public void onCompleted() {
+ try {
+ _statusObserver.onCompleted();
+ } finally {
+ wakeWaiters();
+ }
+ }
+ };
+
+ return (ClientCallStreamObserver<MailboxContent>) PinotMailboxGrpc.newStub(
+ _channelManager.getChannel(_hostname, _port))
.withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
.withDeadlineAfter(_deadlineMs - System.currentTimeMillis(),
TimeUnit.MILLISECONDS)
- .open(_statusObserver);
+ .open(responseObserver);
}
protected void sendContent(ByteString byteString, boolean waitForMore) {
- MailboxContent content = MailboxContent.newBuilder()
- .setMailboxId(_id)
- .setPayload(byteString)
- .setWaitForMore(waitForMore)
- .build();
- _contentObserver.onNext(content);
+ sendContent(byteString, waitForMore, false);
+ }
+
+ protected void sendContent(ByteString byteString, boolean waitForMore,
boolean bypassReady) {
+ if (!awaitReady(bypassReady)) {
+ // Either the mailbox was cancelled while we were waiting (normal path)
or the gRPC stream is already dead
+ // (bypass path). Either way, skip the send.
+ return;
+ }
+ // _readyLock is the serialization point for outbound observer calls. Hold
it across the isTerminated() re-check
+ // and the onNext() so the data path cannot race with cancel() / send(Eos)
onto the same non-thread-safe
+ // ClientCallStreamObserver. awaitReady() is intentionally outside the
lock: its slow path already acquires
+ // _readyLock to wait on _readyCond, and acquiring before calling it would
force the fast `isReady() == true`
+ // path to take and release the lock for no benefit. By the time
awaitReady() returns true, the slow-path
+ // lock release happens-before this acquisition, so the visibility we need
is in place.
+ _readyLock.lock();
+ try {
+ if (!bypassReady && isTerminated()) {
+ return;
+ }
+ MailboxContent content = MailboxContent.newBuilder()
+ .setMailboxId(_id)
+ .setPayload(byteString)
+ .setWaitForMore(waitForMore)
+ .build();
+ _contentObserver.onNext(content);
+ } finally {
+ _readyLock.unlock();
+ }
+ }
+
+ /// Blocks the calling (query-runner) thread until the gRPC client outbound
is ready to accept another chunk, the
+ /// mailbox terminates, or the query deadline is exceeded. Returns `true` if
the caller should proceed with the
+ /// `onNext` call, `false` if the send should be skipped.
+ ///
+ /// Two modes:
+ /// * `bypassReady = false` (normal user sends): waits for
[#_contentObserver]`.isReady()` to flip, but exits
+ /// early if the mailbox has been [#isTerminated terminated] in the
meantime. This makes [#cancel] able to
+ /// unblock a blocked sender promptly.
+ /// * `bypassReady = true` (the cancel / close paths): never waits, never
short-circuits on `isTerminated()`.
+ /// The only check is whether the underlying gRPC stream is already dead
([MailboxStatusObserver#isFinished]),
+ /// in which case there is nothing to send. This is what lets a cancel
issued while the receiver is congested
+ /// push its error EOS through without blocking behind back-pressure of
its own.
+ ///
+ /// Spool note: when a stage spools to multiple destination mailboxes via
[BlockExchange.BlockExchangeSendingMailbox],
+ /// one slow downstream worker will throttle the whole spool — every wrapped
mailbox is awaited in turn from the same
+ /// OpChain thread. This is intentional: the alternative (forking each
destination onto its own thread) would
+ /// re-introduce the unbounded outbound queue we are fixing here. Spool
throughput is gated by the slowest consumer.
+ private boolean awaitReady(boolean bypassReady) {
+ if (bypassReady || !_backpressureEnabled) {
+ return !_statusObserver.isFinished();
+ }
+ // Fast path: don't take the lock if the observer is already ready. This
is the common case in the steady state.
+ if (_contentObserver.isReady()) {
+ return true;
+ }
+ if (isTerminated() || isEarlyTerminated()) {
+ return false;
+ }
+ _readyLock.lock();
+ try {
+ while (!_contentObserver.isReady()) {
+ if (isTerminated() || isEarlyTerminated()) {
+ return false;
+ }
+ // Cooperative termination poll so query cancellation can unblock the
wait through the same mechanism we use
+ // elsewhere in the MSE.
+ QueryThreadContext.checkTerminationAndSampleUsage(SEND_SCOPE);
+ long remainingMs = _deadlineMs - System.currentTimeMillis();
+ if (remainingMs <= 0) {
+ throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
+ "Deadline exceeded while waiting for gRPC outbound to become
ready on mailbox: " + _id);
+ }
+ try {
+ _readyCond.await(remainingMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new QueryException(QueryErrorCode.INTERNAL,
+ "Interrupted while waiting for gRPC outbound to become ready on
mailbox: " + _id, e);
+ }
+ }
+ return true;
+ } finally {
+ _readyLock.unlock();
+ }
+ }
+
+ /// Wakes every waiter in [#awaitReady]. Called from the gRPC ready handler,
from [MailboxStatusObserver] events,
+ /// and from [#cancel] / [#close]. Must be cheap because it can fire from
Netty event-loop threads.
+ private void wakeWaiters() {
+ // Lock-from-event-loop pattern, intentional and load-bearing. This method
can fire from a Netty channel /
+ // gRPC event-loop thread via the `setOnReadyHandler` registered in
`beforeStart`, as well as from the
+ // `MailboxStatusObserver` callbacks (`onNext`, `onError`, `onCompleted`)
— all of which run on gRPC
+ // executor threads. Acquiring `_readyLock` here therefore briefly blocks
the event-loop thread if a sender is
+ // mid-`onNext` / `onCompleted` under the lock. That is accepted because:
+ // (a) lock-held outbound critical sections are bounded by a single
`_contentObserver.onNext` or
+ // `onCompleted` call — the lock is never held across an
`awaitReady()` wait, an I/O wait, or any other
+ // blocking operation, so the event-loop stall is at most one
observer call;
+ // (b) the obvious alternative — defer the signal to a separate executor
— adds latency to every wakeup
+ // and an extra thread hop on the hot back-pressure unblock path,
which matters in practice more than
+ // the rare event-loop stall this design accepts.
+ _readyLock.lock();
+ try {
+ _readyCond.signalAll();
+ } finally {
+ _readyLock.unlock();
+ }
}
@Override
@@ -334,15 +627,42 @@ public class GrpcSendingMailbox implements SendingMailbox
{
public void close()
throws Exception {
if (!isTerminated()) {
- String errorMsg = "Closing gPRC mailbox without proper EOS message";
+ String errorMsg = "Closing gRPC mailbox without proper EOS message";
RuntimeException ex = new RuntimeException(errorMsg);
ex.fillInStackTrace();
LOGGER.error(errorMsg, ex);
_senderSideClosed = true;
+ wakeWaiters();
+
+ // Short-circuit when the sender never opened a stream. close() is the
"should not happen" cleanup
+ // fallback — for query stages pruned before any data flows the mailbox
is constructed, never sent on,
+ // and then closed. Pre-PR this was a silent no-op on the wire; commit
aeaacc893d regressed that by
+ // routing through ensureContentObserverInitialized() to push an error
EOS, which opened a fresh gRPC
+ // stream and half-closed it (three wasted round-trips, plus a new
exception surface if channel open
+ // throws during shutdown). cancel() does NOT short-circuit here because
it has a load-bearing reason
+ // to open the stream: there may be a receiver-side reader blocked
waiting for the cancel signal. close()
+ // has no such urgency. The volatile read of _contentObserver here
happens-before the lock acquire below,
+ // so no additional fence is needed.
+ ClientCallStreamObserver<MailboxContent> observer = _contentObserver;
+ if (observer == null) {
+ return;
+ }
MseBlock errorBlock = ErrorMseBlock.fromError(QueryErrorCode.INTERNAL,
errorMsg);
- if (_contentObserver != null) {
- processAndSend(errorBlock, List.of());
+ // _readyLock serialises outbound observer calls so the error EOS +
onCompleted() is one atomic outbound,
+ // same shape as cancel(). Without the onCompleted() the gRPC client
stream stayed half-open from the sender
+ // side until the per-call deadline fired — for long-running MSE queries
that's a multi-minute allocator pin
+ // on the channel, which is exactly the leak the back-pressure work is
trying to fix.
+ _readyLock.lock();
+ try {
+ processAndSend(errorBlock, List.of(), /* bypassReady */ true);
+ observer.onCompleted();
+ } catch (Exception e) {
+ // Stream may already be half-closed from a racing send(Eos) / cancel
that we lost to under the lock.
+ // Symmetric to the defensive catch in cancel() — duplicate half-close
is benign here.
+ LOGGER.debug("Caught exception closing mailbox: {}", _id, e);
+ } finally {
+ _readyLock.unlock();
}
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 73fd1379ac4..3aa8924009e 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.query.mailbox;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
@@ -30,6 +31,10 @@ import java.util.function.Function;
import javax.annotation.Nullable;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
import org.apache.pinot.core.instance.context.BrokerContext;
import org.apache.pinot.core.instance.context.ControllerContext;
@@ -96,6 +101,7 @@ public class MailboxService {
* bloating is added by gRPC and protobuf.
*/
private final int _maxInboundMessageSize;
+ private final boolean _grpcSenderBackpressureEnabled;
private GrpcMailboxServer _grpcMailboxServer;
@@ -121,11 +127,64 @@ public class MailboxService {
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
);
- _channelManager = new ChannelManager(_clientSslContext,
_maxInboundMessageSize, getIdleTimeout(config));
+ _grpcSenderBackpressureEnabled = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED
+ );
+ int writeBufferHighWaterMarkBytes = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES);
+ int writeBufferLowWaterMarkBytes = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
+ _channelManager = new ChannelManager(_clientSslContext,
_maxInboundMessageSize, getIdleTimeout(config),
+ writeBufferHighWaterMarkBytes, writeBufferLowWaterMarkBytes);
_accessControlFactory = accessControlFactory;
+ registerMailboxClientGauges();
LOGGER.info("Initialized MailboxService with hostname: {}, port: {}",
hostname, port);
}
+ /// Registers gauges exposing the memory used by the gRPC client allocator
+ /// shared by every [GrpcSendingMailbox] this service creates. The companion
+ /// gauges for the server allocator are registered in [GrpcMailboxServer].
+ ///
+ /// Notice we are wiring the shaded gRPC Netty allocator
+ /// ([io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator]) rather
than
+ /// the non-shaded one.
+ ///
+ /// Only the two `MAILBOX_CLIENT_USED_*` gauges are mirrored on the client
side;
+ /// the six debug-only counterparts exposed by the server
(`MAILBOX_SERVER_ARENAS_*`,
+ /// `MAILBOX_SERVER_CACHE_SIZE_*`, `MAILBOX_SERVER_THREADLOCALCACHE`,
+ /// `MAILBOX_SERVER_CHUNK_SIZE`) are intentionally not mirrored here. The
operational
+ /// signal that matters on the client is direct-memory pool exhaustion (the
OOM that
+ /// motivated wiring these gauges in the first place); the rest are
server-side
+ /// allocator-internals useful only for debugging. If you ever need to
mirror the
+ /// remaining six, copy the registration block from [GrpcMailboxServer]'s
constructor.
+ private void registerMailboxClientGauges() {
+ switch (_instanceType) {
+ case BROKER: {
+ BrokerMetrics brokerMetrics = BrokerMetrics.get();
+
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_CLIENT_USED_DIRECT_MEMORY,
+ _channelManager::usedDirectMemoryBytes);
+
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_CLIENT_USED_HEAP_MEMORY,
+ _channelManager::usedHeapMemoryBytes);
+ break;
+ }
+ case SERVER: {
+ ServerMetrics serverMetrics = ServerMetrics.get();
+
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_CLIENT_USED_DIRECT_MEMORY,
+ _channelManager::usedDirectMemoryBytes);
+
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_CLIENT_USED_HEAP_MEMORY,
+ _channelManager::usedHeapMemoryBytes);
+ break;
+ }
+ default:
+ // Controller does not run a MailboxService in production today, but
if one is ever
+ // started for tests we silently skip metric registration rather than
failing.
+ break;
+ }
+ }
+
/**
* Starts the mailbox service.
*/
@@ -160,13 +219,16 @@ public class MailboxService {
* not open the underlying channel or acquire any additional resources.
Instead, it will initialize lazily when the
* data is sent for the first time.
*/
+ // TODO(https://github.com/apache/pinot/issues/18539): add a global byte
budget across peers; the
+ // per-channel WriteBufferWaterMark bound is watermark.high × #peers ×
#concurrent_queries, which can
+ // reach multiple GiB and re-trigger OutOfDirectMemoryError at large
fan-outs.
public SendingMailbox getSendingMailbox(String hostname, int port, String
mailboxId, long deadlineMs,
StatMap<MailboxSendOperator.StatKey> statMap) {
if (_hostname.equals(hostname) && _port == port) {
return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap);
} else {
return new GrpcSendingMailbox(mailboxId, _channelManager, hostname,
port, deadlineMs, statMap,
- _maxInboundMessageSize);
+ _maxInboundMessageSize, _grpcSenderBackpressureEnabled);
}
}
@@ -190,6 +252,41 @@ public class MailboxService {
return _channelManager.resetConnectBackoff(hostname, port);
}
+ /// Current value of [BrokerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] /
+ /// [ServerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] — bytes pinned by the
+ /// shared gRPC client allocator backing every [GrpcSendingMailbox] created
+ /// from this service.
+ @VisibleForTesting
+ public long getMailboxClientUsedDirectMemoryBytes() {
+ return _channelManager.usedDirectMemoryBytes();
+ }
+
+ /// Current value of [BrokerGauge#MAILBOX_CLIENT_USED_HEAP_MEMORY] /
+ /// [ServerGauge#MAILBOX_CLIENT_USED_HEAP_MEMORY].
+ @VisibleForTesting
+ public long getMailboxClientUsedHeapMemoryBytes() {
+ return _channelManager.usedHeapMemoryBytes();
+ }
+
+ /// Current value of [BrokerGauge#MAILBOX_SERVER_USED_DIRECT_MEMORY] /
+ /// [ServerGauge#MAILBOX_SERVER_USED_DIRECT_MEMORY] — bytes pinned by the
+ /// gRPC server allocator handling inbound mailbox traffic.
+ ///
+ /// Returns 0 before [#start] is called (the gRPC server is built lazily
there).
+ @VisibleForTesting
+ public long getMailboxServerUsedDirectMemoryBytes() {
+ return _grpcMailboxServer != null ?
_grpcMailboxServer.usedDirectMemoryBytes() : 0L;
+ }
+
+ /// Current value of [BrokerGauge#MAILBOX_SERVER_USED_HEAP_MEMORY] /
+ /// [ServerGauge#MAILBOX_SERVER_USED_HEAP_MEMORY].
+ ///
+ /// Returns 0 before [#start] is called.
+ @VisibleForTesting
+ public long getMailboxServerUsedHeapMemoryBytes() {
+ return _grpcMailboxServer != null ?
_grpcMailboxServer.usedHeapMemoryBytes() : 0L;
+ }
+
/**
* Releases the receiving mailbox from the cache.
*
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index bdccf59a6bf..f7567fe1951 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -18,11 +18,13 @@
*/
package org.apache.pinot.query.mailbox.channel;
+import com.google.common.base.Preconditions;
import io.grpc.ConnectivityState;
import io.grpc.ManagedChannel;
import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
+import io.grpc.netty.shaded.io.netty.channel.WriteBufferWaterMark;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
@@ -62,6 +64,7 @@ public class ChannelManager {
private final PooledByteBufAllocator _bufAllocator;
@Nullable
private final SslContext _clientSslContext;
+ private final WriteBufferWaterMark _writeBufferWaterMark;
/**
* Constructs a {@code ChannelManager}.
@@ -69,11 +72,26 @@ public class ChannelManager {
* @param clientSslContext optional cached client {@link SslContext} to
reuse across channels
* @param maxInboundMessageSize maximum inbound message size for gRPC
channels
* @param idleTimeout idle timeout for gRPC channels; channels close after
this period of inactivity
+ * @param writeBufferHighWaterMarkBytes Netty per-channel {@link
WriteBufferWaterMark} high watermark. This limit is
+ * per {@code (host, port)} peer and is
shared across all streams multiplexed on
+ * that channel.
+ * @param writeBufferLowWaterMarkBytes Netty per-channel {@link
WriteBufferWaterMark} low mark. Once the channel's
+ * pending write queue grows above the
high watermark, the channel is marked
+ * unwritable; it becomes writable again
only when the queue drains below this
+ * low watermark. Must satisfy {@code 0
< low ≤ high}; validated eagerly here so
+ * misconfiguration surfaces at startup
rather than on the first query.
*/
- public ChannelManager(@Nullable SslContext clientSslContext, int
maxInboundMessageSize, Duration idleTimeout) {
+ public ChannelManager(@Nullable SslContext clientSslContext, int
maxInboundMessageSize, Duration idleTimeout,
+ int writeBufferHighWaterMarkBytes, int writeBufferLowWaterMarkBytes) {
_clientSslContext = clientSslContext;
_maxInboundMessageSize = maxInboundMessageSize;
_idleTimeout = idleTimeout;
+ Preconditions.checkArgument(writeBufferLowWaterMarkBytes > 0,
+ "writeBufferLowWaterMarkBytes must be positive, got: %s",
writeBufferLowWaterMarkBytes);
+ // The `low <= high` (and `low >= 0`) invariant is also checked by Netty's
WriteBufferWaterMark constructor; by
+ // constructing the watermark eagerly here we surface any violation at
startup instead of on the first send to
+ // a previously-unseen peer.
+ _writeBufferWaterMark = new
WriteBufferWaterMark(writeBufferLowWaterMarkBytes,
writeBufferHighWaterMarkBytes);
// Use direct buffers (off-heap) for better performance - matches
server-side configuration
_bufAllocator = new PooledByteBufAllocator(true);
}
@@ -86,6 +104,7 @@ public class ChannelManager {
.forAddress(k.getLeft(), k.getRight())
.maxInboundMessageSize(_maxInboundMessageSize)
.withOption(ChannelOption.ALLOCATOR, _bufAllocator)
+ .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
_writeBufferWaterMark)
.sslContext(_clientSslContext);
return decorate(channelBuilder).build();
}
@@ -97,6 +116,7 @@ public class ChannelManager {
.forAddress(k.getLeft(), k.getRight())
.maxInboundMessageSize(_maxInboundMessageSize)
.withOption(ChannelOption.ALLOCATOR, _bufAllocator)
+ .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK,
_writeBufferWaterMark)
.usePlaintext();
return decorate(channelBuilder).build();
});
@@ -122,4 +142,20 @@ public class ChannelManager {
private NettyChannelBuilder decorate(NettyChannelBuilder builder) {
return builder.idleTimeout(_idleTimeout.getSeconds(), TimeUnit.SECONDS);
}
+
+ /// Bytes of direct (off-heap) memory currently pinned by the shared gRPC
+ /// client allocator. Covers every channel managed by this instance and
remains
+ /// meaningful regardless of whether Netty is configured to prefer direct or
+ /// heap buffers (e.g. `-Dio.netty.noPreferDirect=true`). Consumed by
+ /// [MailboxService] to register the `MAILBOX_CLIENT_USED_DIRECT_MEMORY`
gauge.
+ public long usedDirectMemoryBytes() {
+ return _bufAllocator.metric().usedDirectMemory();
+ }
+
+ /// Bytes of heap memory currently pinned by the shared gRPC client
allocator.
+ /// Consumed by [MailboxService] to register the
+ /// `MAILBOX_CLIENT_USED_HEAP_MEMORY` gauge.
+ public long usedHeapMemoryBytes() {
+ return _bufAllocator.metric().usedHeapMemory();
+ }
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 2a9264ceada..893c4b8d212 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -26,6 +26,7 @@ import
io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric;
import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
+import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
@@ -44,6 +45,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
import org.apache.pinot.spi.config.instance.InstanceType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -53,10 +56,15 @@ import org.apache.pinot.spi.utils.CommonConstants;
* send by the sender of the sender/receiver pair.
*/
public class GrpcMailboxServer extends PinotMailboxGrpc.PinotMailboxImplBase {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcMailboxServer.class);
private static final long DEFAULT_SHUTDOWN_TIMEOUT_MS = 10_000L;
private final MailboxService _mailboxService;
private final Server _server;
+ private final PooledByteBufAllocatorMetric _bufAllocatorMetric;
+ private final int _flowControlWindowBytes;
+ private final int _inboundMessageCredit;
+ private final boolean _manualInboundFlowControlEnabled;
/**
* Constructs a gRPC-based mailbox server.
@@ -79,6 +87,7 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+ _bufAllocatorMetric = metric;
// Register memory metrics based on instance type
InstanceType instanceType = mailboxService.getInstanceType();
@@ -132,13 +141,35 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
if (accessControlFactory != null) {
builder.intercept(new AuthorizationInterceptor(accessControlFactory));
}
+ _flowControlWindowBytes = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES);
+ _inboundMessageCredit = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT);
+ _manualInboundFlowControlEnabled = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED);
+ int maxInboundMessageSize = config.getProperty(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES);
+ Preconditions.checkArgument(_inboundMessageCredit > 0,
+ "%s must be positive, got: %s",
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT,
+ _inboundMessageCredit);
+ // A window smaller than the largest possible single message makes the
stream pathological: the sender's
+ // isReady() flaps on every message, since no single message can ever fit
in the available credit. Fail fast
+ // at startup rather than letting it manifest as flapping back-pressure
mid-query.
+ Preconditions.checkArgument(_flowControlWindowBytes >=
maxInboundMessageSize,
+ "%s (%s) must be >= %s (%s)",
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES,
_flowControlWindowBytes,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
maxInboundMessageSize);
builder
.addService(this)
.withOption(ChannelOption.ALLOCATOR, bufAllocator)
.withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
- .maxInboundMessageSize(config.getProperty(
-
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES));
+ .maxInboundMessageSize(maxInboundMessageSize)
+ .flowControlWindow(_flowControlWindowBytes);
// Add SSL context only if TLS is configured
if (tlsConfig != null) {
@@ -151,6 +182,9 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
}
public void start() {
+ LOGGER.info("Starting GrpcMailboxServer with flowControlWindow={} bytes,
inboundMessageCredit={}, "
+ + "manualInboundFlowControlEnabled={}",
+ _flowControlWindowBytes, _inboundMessageCredit,
_manualInboundFlowControlEnabled);
try {
_server.start();
} catch (IOException e) {
@@ -166,9 +200,35 @@ public class GrpcMailboxServer extends
PinotMailboxGrpc.PinotMailboxImplBase {
}
}
+ /// Bytes of direct (off-heap) memory currently pinned by the gRPC server
+ /// allocator backing this mailbox server. This is the same allocator whose
+ /// values are exported as `MAILBOX_SERVER_USED_DIRECT_MEMORY` gauges.
+ public long usedDirectMemoryBytes() {
+ return _bufAllocatorMetric.usedDirectMemory();
+ }
+
+ /// Bytes of heap memory currently pinned by the gRPC server allocator
backing
+ /// this mailbox server. Exported as `MAILBOX_SERVER_USED_HEAP_MEMORY`
gauges.
+ public long usedHeapMemoryBytes() {
+ return _bufAllocatorMetric.usedHeapMemory();
+ }
+
@Override
public StreamObserver<Mailbox.MailboxContent>
open(StreamObserver<Mailbox.MailboxStatus> responseObserver) {
String mailboxId = ChannelUtils.MAILBOX_ID_CTX_KEY.get();
- return new MailboxContentObserver(_mailboxService, mailboxId,
responseObserver);
+ ServerCallStreamObserver<Mailbox.MailboxStatus> serverCallObserver =
+ (ServerCallStreamObserver<Mailbox.MailboxStatus>) responseObserver;
+ if (_manualInboundFlowControlEnabled) {
+ // Manual inbound flow control: override gRPC's auto-inbound (which
calls request(1) after each
+ // onNext) and prefetch _inboundMessageCredit messages up-front.
MailboxContentObserver.onNext will
+ // then replenish one credit at the top of each call so the in-flight
window stays full while the
+ // application drains. This is the primary throughput knob for
small/medium MSE blocks.
+ serverCallObserver.disableAutoInboundFlowControl();
+ serverCallObserver.request(_inboundMessageCredit);
+ }
+ // Else: leave gRPC's auto-inbound in place — only 1 message in flight at
a time. This is the pre-PR
+ // behaviour, retained as a rollback knob via
KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED.
+ return new MailboxContentObserver(_mailboxService, mailboxId,
serverCallObserver,
+ _manualInboundFlowControlEnabled);
}
}
diff --git
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index 9bb33e2f918..e5088bc8c47 100644
---
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.mailbox.channel;
import io.grpc.Context;
+import io.grpc.stub.ServerCallStreamObserver;
import io.grpc.stub.StreamObserver;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -47,21 +48,35 @@ public class MailboxContentObserver implements
StreamObserver<MailboxContent> {
private static final Logger LOGGER =
LoggerFactory.getLogger(MailboxContentObserver.class);
private final MailboxService _mailboxService;
- private final StreamObserver<MailboxStatus> _responseObserver;
+ private final ServerCallStreamObserver<MailboxStatus> _responseObserver;
+ private final boolean _manualInboundFlowControlEnabled;
private final List<ByteBuffer> _mailboxBuffers =
Collections.synchronizedList(new ArrayList<>());
private boolean _closedStream = false;
private volatile ReceivingMailbox _mailbox;
public MailboxContentObserver(MailboxService mailboxService, String
mailboxId,
- StreamObserver<MailboxStatus> responseObserver) {
+ ServerCallStreamObserver<MailboxStatus> responseObserver, boolean
manualInboundFlowControlEnabled) {
_mailboxService = mailboxService;
_responseObserver = responseObserver;
+ _manualInboundFlowControlEnabled = manualInboundFlowControlEnabled;
_mailbox = StringUtils.isNotBlank(mailboxId) ?
_mailboxService.getReceivingMailbox(mailboxId) : null;
}
@Override
public void onNext(MailboxContent mailboxContent) {
+ if (_manualInboundFlowControlEnabled) {
+ // Replenish one inbound-message credit immediately, before any work
that might block (e.g., the
+ // offerData lock acquisition inside _mailbox.offerRaw). This decouples
the sender's HTTP/2 window
+ // replenishment from the receiver's per-message processing time — gRPC
will issue the WINDOW_UPDATE
+ // for this message as soon as this request(1) call sets the credit, not
waiting for onNext to return.
+ // Do not move this below the blocking _mailbox.offerRaw call — it must
replenish credit
+ // BEFORE the offer so the receiver doesn't gate the sender on
per-message application drain time.
+ _responseObserver.request(1);
+ }
+ // Else: gRPC's auto-inbound is in place and will automatically replenish
1 credit after onNext
+ // returns. Do not call request(1) here in that mode — it would
double-count the credit and break
+ // the 1-in-flight semantics that the rollback knob restores.
if (_closedStream) {
LOGGER.debug("Received a late message once the stream was closed.
Ignoring it.");
return;
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureOffPathTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureOffPathTest.java
new file mode 100644
index 00000000000..43ff4bb63b8
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureOffPathTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Pins the off-path through `awaitReady`, which is also the production
default for
+/// `pinot.query.runner.grpc.sender.backpressure.enabled` (see
+///
[org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED]).
+///
+/// ## What this test checks
+///
+/// When the flag is set to `false` (the default),
[GrpcSendingMailbox#awaitReady] must short-circuit
+/// immediately — i.e. the `!_backpressureEnabled` branch in the guard must
fire — so the sender pushes
+/// blocks without waiting for the receiver to drain them. Under that
condition a fast sender can push
+/// orders of magnitude more blocks than a slow receiver polls in the same
wall-clock window.
+///
+/// ## Regression risk
+///
+/// If someone accidentally removes or inverts the `bypassReady ||
!_backpressureEnabled` short-circuit
+/// in `awaitReady`, the gate would become permanently active and this flag
would have no effect. The
+/// test would then see `sendCount` track `polledCount` closely (as in the
*opt-in* companion test),
+/// causing the `sendCount > polledCount * 10` assertion to fail and surfacing
the regression in CI.
+///
+/// ## Relation to the companion tests
+///
+/// [GrpcSenderBackpressureTest] verifies the opt-in (`enabled = true`) path
with wide transport
+/// defaults — that the gate keeps the sender in check.
[GrpcSenderBackpressureTightGateTest] does the
+/// same with narrow transport so the gate is the dominant back-pressure
mechanism. This test verifies
+/// the off path — that turning the gate off (or leaving it at the production
default) actually removes
+/// it. Together they pin both sides of the boolean.
+public class GrpcSenderBackpressureOffPathTest {
+ private static final DataSchema SCHEMA = new DataSchema(
+ new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+ // Same small payload as the companion test — enough to have a real
serialized
+ // body but small enough not to exhaust direct memory within the 3-second
budget.
+ private static final String PAYLOAD = "x".repeat(128);
+ private static final long SEND_BUDGET_NS = TimeUnit.SECONDS.toNanos(3);
+ private static final long READER_POLL_INTERVAL_MS = 20;
+
+ private MailboxService _senderService;
+ private MailboxService _receiverService;
+
+ @BeforeClass
+ public void setUp() {
+ // Disable the back-pressure gate so awaitReady short-circuits
unconditionally.
+ PinotConfiguration config = new PinotConfiguration(Map.of(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
false));
+ _senderService = new MailboxService("localhost",
QueryTestUtils.getAvailablePort(),
+ InstanceType.SERVER, config);
+ _senderService.start();
+ _receiverService = new MailboxService("localhost",
QueryTestUtils.getAvailablePort(),
+ InstanceType.SERVER, config);
+ _receiverService.start();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _senderService.shutdown();
+ _receiverService.shutdown();
+ }
+
+ @Test
+ public void killSwitchDisablesBackpressureGate()
+ throws Exception {
+ String mailboxId = MailboxIdUtils.toMailboxId(1, 1, 0, 0, 0);
+ long deadlineMs = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(5);
+ StatMap<MailboxSendOperator.StatKey> stats =
+ new StatMap<>(MailboxSendOperator.StatKey.class);
+
+ SendingMailbox sender = _senderService.getSendingMailbox(
+ "localhost", _receiverService.getPort(), mailboxId, deadlineMs, stats);
+ ReceivingMailbox receiver =
_receiverService.getReceivingMailbox(mailboxId);
+ receiver.registeredReader(() -> { });
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+ AtomicLong polled = new AtomicLong();
+ Thread slowReader = new Thread(() -> {
+ while (!stop.get()) {
+ ReceivingMailbox.MseBlockWithStats msg = receiver.poll();
+ if (msg != null && !msg.getBlock().isEos()) {
+ polled.incrementAndGet();
+ }
+ sleepQuiet(READER_POLL_INTERVAL_MS);
+ }
+ }, "slow-reader");
+ slowReader.setDaemon(true);
+ slowReader.start();
+
+ // Same instance, sent over and over.
+ RowHeapDataBlock block = OperatorTestUtil.block(SCHEMA, new
Object[]{PAYLOAD});
+
+ int sendCount = 0;
+
+ // Watchdog: with the gate off the sender races ahead without blocking; we
still
+ // need a watchdog to cap wall-clock time and prevent the loop from filling
+ // direct memory before the test completes. The cancel wakes any thread
that
+ // happens to be inside awaitReady (shouldn't be any with the gate off, but
+ // it also covers the isTerminated() check in the send loop).
+ ScheduledExecutorService watchdog =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "test-budget-watchdog");
+ t.setDaemon(true);
+ return t;
+ });
+ watchdog.schedule(() -> sender.cancel(new RuntimeException("test budget
elapsed")),
+ SEND_BUDGET_NS, TimeUnit.NANOSECONDS);
+
+ try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+ while (!sender.isTerminated()) {
+ sender.send(block);
+ sendCount++;
+ }
+ } finally {
+ watchdog.shutdownNow();
+ }
+
+ stop.set(true);
+ slowReader.join(TimeUnit.SECONDS.toMillis(10));
+
+ long polledCount = polled.get();
+
+ System.out.printf(Locale.ROOT,
+ "[GrpcSenderBackpressureOffPathTest] sent=%d polled=%d ratio=%.1fx%n",
+ sendCount, polledCount,
+ polledCount == 0 ? Double.POSITIVE_INFINITY : (double) sendCount /
polledCount);
+
+ // The sender must have pushed far more blocks than the slow receiver could
+ // poll. At 20 ms polling intervals over a 3-second budget the receiver
sees
+ // roughly 150 polls; with the gate off the sender typically reaches
hundreds
+ // of thousands of sends. A 10x ratio is an extremely conservative floor —
+ // if the gate were accidentally left active, sendCount would track
polledCount
+ // closely (ratio ~1–3x) and this assertion would fail.
+ assertTrue(sendCount > polledCount * 10,
+ "Expected the kill-switch to let the sender massively outrpace the
receiver, but "
+ + "sendCount=" + sendCount + " polledCount=" + polledCount
+ + " ratio=" + (polledCount == 0 ? "∞" : sendCount / polledCount)
+ + ". This may indicate the backpressure gate is still active
despite the flag being false.");
+ }
+
+ private static void sleepQuiet(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java
new file mode 100644
index 00000000000..e80263b286e
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates that sender-side gRPC back-pressure keeps a fast sender roughly
in step with a slow receiver.
+///
+/// A "fast sender" pushes the same small data block repeatedly on the test
thread while a "slow reader"
+/// thread polls the receiving mailbox at roughly 50 blocks per second. With
back-pressure in place, the
+/// sender thread blocks inside [GrpcSendingMailbox.awaitReady] whenever the
gRPC outbound queue fills,
+/// so the send rate tracks the polling rate plus a bounded in-flight pipeline.
+///
+/// With the new transport defaults (64 MB HTTP/2 flow-control window and 64
MB Netty write-buffer
+/// high-water mark), both the gRPC stream window and the Netty WriteQueue can
buffer far more data
+/// before signalling back-pressure. As a result,
[GrpcSendingMailbox.awaitReady] rarely fires the
+/// application-level gate during a 3-second run; the back-pressure that
*does* apply is transport-level
+/// (the kernel's TCP send buffer and the receiver's gRPC server read loop).
This means the ratio of
+/// `sendCount` to `polledCount` is much larger than with the old
narrow-window defaults, and the test
+/// now exercises that transport-level back-pressure rather than the
application-level gate.
+///
+/// The test asserts two complementary properties:
+/// 1. `sendCount` is bounded by a generous multiple of `polledCount` —
without any back-pressure the
+/// ratio would be orders of magnitude higher as the sender exhausts
direct memory. The observed
+/// ratio with the new defaults is around 2200×; the threshold is set to
~3× that to give headroom
+/// for hardware variation while still catching complete removal of all
back-pressure.
+/// 2. The peak growth of the sender's client allocator stays under a
generous cap — the wider
+/// in-flight pipeline justified by the larger transport defaults means
more data can be in-flight
+/// at any moment, so the cap is calibrated to ~3× the observed peak
growth (~8 MB).
+///
+/// The thresholds are intentionally loose: this is a regression guard against
all back-pressure being
+/// silently removed, not a precise performance SLA.
+public class GrpcSenderBackpressureTest {
+ private static final DataSchema SCHEMA = new DataSchema(
+ new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+ // Small but not empty — enough that each MailboxContent has a real payload
+ // but small enough that 100k of them comfortably fit in JVM direct memory on
+ // a CI machine.
+ private static final String PAYLOAD = "x".repeat(128);
+ private static final long SEND_BUDGET_NS = TimeUnit.SECONDS.toNanos(3);
+ private static final long READER_POLL_INTERVAL_MS = 20;
+
+ private MailboxService _senderService;
+ private MailboxService _receiverService;
+
+ @BeforeClass
+ public void setUp() {
+ // Both back-pressure features (sender gate + manual receiver-side flow
control with prefetched
+ // credit) default to off; this test exists to pin the bounded-sender
behaviour they provide, so we
+ // explicitly turn both on here.
+ PinotConfiguration config = new PinotConfiguration(Map.of(
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
true,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
true,
+
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT, 128));
+ _senderService = new MailboxService("localhost",
QueryTestUtils.getAvailablePort(),
+ InstanceType.SERVER, config);
+ _senderService.start();
+ _receiverService = new MailboxService("localhost",
QueryTestUtils.getAvailablePort(),
+ InstanceType.SERVER, config);
+ _receiverService.start();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _senderService.shutdown();
+ _receiverService.shutdown();
+ }
+
+ @Test
+ public void senderObservesBackpressureFromSlowReceiver()
+ throws Exception {
+ String mailboxId = MailboxIdUtils.toMailboxId(1, 1, 0, 0, 0);
+ long deadlineMs = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(5);
+ StatMap<MailboxSendOperator.StatKey> stats =
+ new StatMap<>(MailboxSendOperator.StatKey.class);
+
+ SendingMailbox sender = _senderService.getSendingMailbox(
+ "localhost", _receiverService.getPort(), mailboxId, deadlineMs, stats);
+ ReceivingMailbox receiver =
_receiverService.getReceivingMailbox(mailboxId);
+ receiver.registeredReader(() -> { });
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+ AtomicLong polled = new AtomicLong();
+ Thread slowReader = new Thread(() -> {
+ while (!stop.get()) {
+ ReceivingMailbox.MseBlockWithStats msg = receiver.poll();
+ if (msg != null && !msg.getBlock().isEos()) {
+ polled.incrementAndGet();
+ }
+ sleepQuiet(READER_POLL_INTERVAL_MS);
+ }
+ }, "slow-reader");
+ slowReader.setDaemon(true);
+ slowReader.start();
+
+ // Same instance, sent over and over.
+ RowHeapDataBlock block = OperatorTestUtil.block(SCHEMA, new
Object[]{PAYLOAD});
+
+ // We read memory through the `MailboxService` gauge accessors instead of
+ // `PlatformDependent.usedDirectMemory()`. The gauges:
+ // * are scoped per `MailboxService`, so the numbers don't leak in from
+ // other gRPC traffic in the same JVM;
+ // * report both direct and heap, so they stay meaningful when Netty is
+ // forced to heap (e.g. `-Dio.netty.noPreferDirect=true`);
+ // * are exactly the values exported in production as the
+ // `MAILBOX_CLIENT_USED_*` and `MAILBOX_SERVER_USED_*` gauges.
+ long baselineClient = senderClientPool(_senderService);
+ long baselineServer = receiverServerPool(_receiverService);
+ long peakClient = baselineClient;
+ long peakServer = baselineServer;
+ int sendCount = 0;
+
+ // Watchdog: with back-pressure in place, `sender.send` blocks once the
gRPC outbound is full, so a wall-clock
+ // deadline checked between sends is not enough to bound the test runtime.
We cancel the sender from a separate
+ // thread when the budget elapses, which wakes the blocked `awaitReady()`
waiter and lets the send loop exit
+ // via the `isTerminated()` check.
+ ScheduledExecutorService watchdog =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "test-budget-watchdog");
+ t.setDaemon(true);
+ return t;
+ });
+ watchdog.schedule(() -> sender.cancel(new RuntimeException("test budget
elapsed")),
+ SEND_BUDGET_NS, TimeUnit.NANOSECONDS);
+
+ try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+ while (!sender.isTerminated()) {
+ sender.send(block);
+ sendCount++;
+ // Sample pool memory periodically rather than on every send to keep
the hot loop tight.
+ if ((sendCount & 0xff) == 0) {
+ peakClient = Math.max(peakClient, senderClientPool(_senderService));
+ peakServer = Math.max(peakServer,
receiverServerPool(_receiverService));
+ }
+ }
+ peakClient = Math.max(peakClient, senderClientPool(_senderService));
+ peakServer = Math.max(peakServer, receiverServerPool(_receiverService));
+ } finally {
+ watchdog.shutdownNow();
+ }
+
+ // RAW_MESSAGES at this point may already include the error EOS the
watchdog's cancel pushed through, so we use
+ // `<=` rather than `==` in the assertion below.
+ int rawMessages = stats.getInt(MailboxSendOperator.StatKey.RAW_MESSAGES);
+
+ stop.set(true);
+ slowReader.join(TimeUnit.SECONDS.toMillis(10));
+
+ long polledCount = polled.get();
+ long clientGrowth = peakClient - baselineClient;
+ long serverGrowth = peakServer - baselineServer;
+
+ System.out.printf(Locale.ROOT,
+ "[GrpcSenderBackpressureTest] sent=%d polled=%d ratio=%.1fx%n"
+ + " sender MAILBOX_CLIENT_USED_*: direct=%dB heap=%dB (peak
growth=%dB)%n"
+ + " receiver MAILBOX_SERVER_USED_*: direct=%dB heap=%dB (peak
growth=%dB)%n",
+ sendCount, polledCount,
+ polledCount == 0 ? Double.POSITIVE_INFINITY : (double) sendCount /
polledCount,
+ _senderService.getMailboxClientUsedDirectMemoryBytes(),
+ _senderService.getMailboxClientUsedHeapMemoryBytes(),
+ clientGrowth,
+ _receiverService.getMailboxServerUsedDirectMemoryBytes(),
+ _receiverService.getMailboxServerUsedHeapMemoryBytes(),
+ serverGrowth);
+
+ // RAW_MESSAGES counts every block we pushed through processAndSend,
including the error EOS the watchdog's
+ // cancel may have emitted. So `rawMessages` is `sendCount` or `sendCount
+ 1`.
+ assertTrue(rawMessages == sendCount || rawMessages == sendCount + 1,
+ "RAW_MESSAGES (" + rawMessages + ") should equal sendCount (" +
sendCount + ") or sendCount+1");
+
+ // (1) Bounded in-flight pipeline.
+ //
+ // With the new 64 MB HTTP/2 flow-control window and 64 MB Netty
write-buffer high-water mark,
+ // the transport can buffer substantially more data before stalling the
sender. The observed
+ // ratio with these defaults is ~2200×; we allow ~3× that (7000×) as a
generous regression guard.
+ // If all back-pressure were removed the sender would exhaust direct
memory within the 3-second
+ // budget at a ratio several orders of magnitude higher — so this
threshold still catches that.
+ long allowedSendCount = polledCount * 7000;
+ assertTrue(sendCount < allowedSendCount,
+ "Sender outpaced the receiver beyond the in-flight allowance. sent=" +
sendCount
+ + " polled=" + polledCount + " allowed=" + allowedSendCount);
+
+ // (2) Bounded sender-side direct memory growth.
+ //
+ // With the wider in-flight pipeline the sender-side allocator may grow by
up to one Netty pool
+ // chunk (~16 MB) during the 3-second run. Observed peak growth is ~8 MB;
we cap at 25 MB
+ // (~3×) to accommodate variation while still catching unbounded
allocation regressions.
+ long clientGrowthCap = 25L * 1024 * 1024;
+ assertTrue(clientGrowth < clientGrowthCap,
+ "Sender client allocator grew beyond expected steady-state. growth=" +
clientGrowth
+ + " cap=" + clientGrowthCap);
+ }
+
+ private static void sleepQuiet(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /// Same value the production `MAILBOX_CLIENT_USED_*` gauges report — sum of
+ /// direct + heap so the test stays valid regardless of Netty's buffer mode.
+ private static long senderClientPool(MailboxService service) {
+ return service.getMailboxClientUsedDirectMemoryBytes() +
service.getMailboxClientUsedHeapMemoryBytes();
+ }
+
+ /// Same value the production `MAILBOX_SERVER_USED_*` gauges report.
+ private static long receiverServerPool(MailboxService service) {
+ return service.getMailboxServerUsedDirectMemoryBytes() +
service.getMailboxServerUsedHeapMemoryBytes();
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTightGateTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTightGateTest.java
new file mode 100644
index 00000000000..3df7ceddb77
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTightGateTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Pins the application-level back-pressure gate in
[GrpcSendingMailbox#awaitReady]
+/// under *narrow* transport configs, so that the gate — not the transport
layer — is
+/// the dominant mechanism keeping the sender in step with the receiver.
+///
+/// ## Why this companion test exists
+///
+/// The original [GrpcSenderBackpressureTest] runs with the production-wide
defaults
+/// (64 MiB HTTP/2 flow-control window, 64 MiB Netty WriteBufferWaterMark high
/ 32 MiB
+/// low). With those wide defaults the transport-level back-pressure alone
bounds the
+/// `sendCount / polledCount` ratio comfortably under any reasonable assertion
at the
+/// 128-byte payload used by the test. The wide-defaults assertion (`sendCount
> polledCount * 7000`)
+/// therefore still passes even if a future refactor deletes the entire
`awaitReady`
+/// machinery — the test no longer guards what it was originally built to
guard.
+///
+/// A reviewer pointed this out: without a companion that *forces* the
application gate
+/// to be the dominant signal, the gate could silently regress and CI would
stay green.
+/// This test closes that gap.
+///
+/// ## What this test pins
+///
+/// The transport pipeline is narrowed to the smallest values gRPC/Netty will
accept:
+///
+/// * `pinot.query.runner.grpc.flow.control.window.bytes = 65535` — gRPC's
minimum HTTP/2
+/// stream window.
+/// * `pinot.query.runner.grpc.write.buffer.high.water.mark.bytes = 262144`
(256 KiB).
+/// * `pinot.query.runner.grpc.write.buffer.low.water.mark.bytes = 131072`
(128 KiB).
+///
+/// At those sizes the wire layer can absorb only a few hundred 128-byte
payloads before
+/// signalling back-pressure, so the application-level
[GrpcSendingMailbox#awaitReady]
+/// gate must engage and park the sender on the
[java.util.concurrent.locks.Condition]
+/// for the test to satisfy the assertion `sendCount <= polledCount * 50 +
10_000`.
+///
+/// ## Regression risk this test catches
+///
+/// If someone breaks the `bypassReady || !_backpressureEnabled` short-circuit
in
+/// `awaitReady`, removes the `Condition` wait, or otherwise deletes the
application
+/// gate while leaving the transport defaults wide, the wide-defaults
companion test
+/// will still pass — but this test will fail loudly. The sender will run away
to many
+/// thousands of times the poll rate, blowing the tight bound asserted here.
+///
+/// ## Test matrix
+///
+/// * [GrpcSenderBackpressureTest] — wide defaults, exercises gate + transport
in
+/// production-ish conditions. Loose assertion.
+/// * [GrpcSenderBackpressureOffPathTest] — wide defaults with the gate at its
production
+/// default (off); asserts the pre-fix unbounded sender behaviour is
preserved.
+/// * This test — narrow transport configs; asserts the application gate alone
is
+/// enough to keep the sender bounded.
+public class GrpcSenderBackpressureTightGateTest {
+ private static final DataSchema SCHEMA = new DataSchema(
+ new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+ // Same small payload as the companion tests — enough to have a real
serialized
+ // body but small enough not to exhaust direct memory within the 3-second
budget.
+ private static final String PAYLOAD = "x".repeat(128);
+ private static final long SEND_BUDGET_NS = TimeUnit.SECONDS.toNanos(3);
+ private static final long READER_POLL_INTERVAL_MS = 20;
+
+ private MailboxService _senderService;
+ private MailboxService _receiverService;
+
+ @BeforeClass
+ public void setUp() {
+ // Narrow the gRPC/Netty transport pipeline to the minimum the stack will
accept,
+ // so the application-level awaitReady gate becomes the dominant
back-pressure
+ // mechanism rather than the transport layer.
+ // Window must be >= max-inbound-message-size; shrink both together so the
application gate (rather than the
+ // window-too-small-for-a-message check in GrpcMailboxServer) is what
bounds the sender. The two back-pressure
+ // features default to off, so we also have to opt in to both for the gate
to exist in this test.
+ PinotConfiguration config = new PinotConfiguration(Map.ofEntries(
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
true),
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
true),
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT,
128),
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES,
65535),
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
262144),
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES,
131072),
+
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
65535)));
+ _senderService = new MailboxService("localhost",
QueryTestUtils.getAvailablePort(),
+ InstanceType.SERVER, config);
+ _senderService.start();
+ _receiverService = new MailboxService("localhost",
QueryTestUtils.getAvailablePort(),
+ InstanceType.SERVER, config);
+ _receiverService.start();
+ }
+
+ @AfterClass
+ public void tearDown() {
+ _senderService.shutdown();
+ _receiverService.shutdown();
+ }
+
+ @Test
+ public void applicationGateBoundsSenderUnderNarrowTransport()
+ throws Exception {
+ String mailboxId = MailboxIdUtils.toMailboxId(1, 1, 0, 0, 0);
+ long deadlineMs = System.currentTimeMillis() +
TimeUnit.MINUTES.toMillis(5);
+ StatMap<MailboxSendOperator.StatKey> stats =
+ new StatMap<>(MailboxSendOperator.StatKey.class);
+
+ SendingMailbox sender = _senderService.getSendingMailbox(
+ "localhost", _receiverService.getPort(), mailboxId, deadlineMs, stats);
+ ReceivingMailbox receiver =
_receiverService.getReceivingMailbox(mailboxId);
+ receiver.registeredReader(() -> { });
+
+ AtomicBoolean stop = new AtomicBoolean(false);
+ AtomicLong polled = new AtomicLong();
+ Thread slowReader = new Thread(() -> {
+ while (!stop.get()) {
+ ReceivingMailbox.MseBlockWithStats msg = receiver.poll();
+ if (msg != null && !msg.getBlock().isEos()) {
+ polled.incrementAndGet();
+ }
+ sleepQuiet(READER_POLL_INTERVAL_MS);
+ }
+ }, "slow-reader");
+ slowReader.setDaemon(true);
+ slowReader.start();
+
+ // Same instance, sent over and over.
+ RowHeapDataBlock block = OperatorTestUtil.block(SCHEMA, new
Object[]{PAYLOAD});
+
+ int sendCount = 0;
+
+ // Watchdog: with the gate active the sender blocks inside awaitReady once
the gRPC
+ // outbound is full, so a wall-clock deadline checked between sends is not
enough to
+ // bound the test runtime. We cancel the sender from a separate thread
when the
+ // budget elapses, which wakes the blocked awaitReady waiter and lets the
send loop
+ // exit via the isTerminated() check.
+ ScheduledExecutorService watchdog =
Executors.newSingleThreadScheduledExecutor(r -> {
+ Thread t = new Thread(r, "test-budget-watchdog");
+ t.setDaemon(true);
+ return t;
+ });
+ watchdog.schedule(() -> sender.cancel(new RuntimeException("test budget
elapsed")),
+ SEND_BUDGET_NS, TimeUnit.NANOSECONDS);
+
+ try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+ while (!sender.isTerminated()) {
+ sender.send(block);
+ sendCount++;
+ }
+ } finally {
+ watchdog.shutdownNow();
+ }
+
+ stop.set(true);
+ slowReader.join(TimeUnit.SECONDS.toMillis(10));
+
+ long polledCount = polled.get();
+
+ System.out.printf(Locale.ROOT,
+ "[GrpcSenderBackpressureTightGateTest] sent=%d polled=%d
ratio=%.1fx%n",
+ sendCount, polledCount,
+ polledCount == 0 ? Double.POSITIVE_INFINITY : (double) sendCount /
polledCount);
+
+ // Tight bound: with the application gate engaged and the transport
narrowed so it
+ // can buffer at most a few hundred small payloads, the sender should
track the
+ // 50 polls/sec reader plus a bounded in-flight pipeline. We allow
`polledCount * 50`
+ // for the per-poll headroom and an additive `10_000` constant to absorb
startup
+ // burst before the first awaitReady park. If the application gate is
silently
+ // removed, the sender races to hundreds of thousands of sends and this
fails.
+ long allowedSendCount = polledCount * 50 + 10_000;
+ assertTrue(sendCount <= allowedSendCount,
+ "Sender ran away despite narrow transport, suggesting the application
gate did not engage. "
+ + "sendCount=" + sendCount + " polledCount=" + polledCount + "
allowed=" + allowedSendCount);
+ }
+
+ private static void sleepQuiet(long ms) {
+ try {
+ Thread.sleep(ms);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
index f1e6480064e..8c67f4e7e7d 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
@@ -19,6 +19,7 @@
package org.apache.pinot.query.mailbox;
import com.google.protobuf.ByteString;
+import io.grpc.stub.ClientCallStreamObserver;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
@@ -27,15 +28,24 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.pinot.common.datablock.DataBlock;
import org.apache.pinot.common.datablock.DataBlockEquals;
import org.apache.pinot.common.datablock.DataBlockUtils;
import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.core.common.datablock.DataBlockBuilder;
import org.apache.pinot.query.mailbox.channel.ChannelManager;
import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
import org.apache.pinot.segment.spi.memory.CompoundDataBuffer;
import org.apache.pinot.segment.spi.memory.DataBuffer;
@@ -58,7 +68,7 @@ public class GrpcSendingMailboxTest {
public void sendDataThrowsWhenQueryTerminated() {
ChannelManager channelManager = Mockito.mock(ChannelManager.class);
GrpcSendingMailbox mailbox = new GrpcSendingMailbox("test-mailbox",
channelManager, "localhost", 0, Long.MAX_VALUE,
- new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024);
+ new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024,
true);
RowHeapDataBlock block = new
RowHeapDataBlock(Collections.singletonList(new Object[]{"val"}),
new DataSchema(new String[]{"foo"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
@@ -71,6 +81,224 @@ public class GrpcSendingMailboxTest {
}
}
+ /// Regression test for the lazy-initialization data race on
`_contentObserver`. Before the fix, both `sendInternal`
+ /// and `cancel` had an unsynchronized `if (_contentObserver == null) {
_contentObserver = getContentObserver(); }`
+ /// pattern. `_contentObserver` is `volatile` so individual reads/writes are
atomic, but two threads racing through
+ /// the check-then-act could both observe `null` and each call
`getContentObserver()`, opening two gRPC streams for
+ /// the same mailbox id. The fix funnels both call sites through
`ensureContentObserverInitialized()`, which uses
+ /// the standard double-checked-lock idiom under `_readyLock`.
+ ///
+ /// The test subclasses [GrpcSendingMailbox] to count `getContentObserver`
calls and return a no-op observer (so we
+ /// never touch the real gRPC stack). Two threads — sender and canceller —
are synchronized on a [CyclicBarrier]
+ /// so they enter their respective methods together, then we assert exactly
one observer was opened. Multiple
+ /// iterations surface the race under timing variance.
+ @Test
+ public void concurrentSendAndCancelInitializeContentObserverExactlyOnce()
+ throws Exception {
+ int iterations = 200;
+ ChannelManager channelManager = Mockito.mock(ChannelManager.class);
+ DataSchema schema =
+ new DataSchema(new String[]{"foo"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ for (int i = 0; i < iterations; i++) {
+ CountingGrpcSendingMailbox mailbox = new
CountingGrpcSendingMailbox("race-mailbox-" + i, channelManager);
+ RowHeapDataBlock block = new
RowHeapDataBlock(Collections.singletonList(new Object[]{"val"}), schema);
+
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Future<?> senderFuture = executor.submit(() -> {
+ try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+ barrier.await();
+ mailbox.send(block);
+ } catch (Exception e) {
+ // Sender may legitimately observe isTerminated() = true if cancel
won the race to flip the flag before
+ // sendInternal's gate check. That is the cooperative path we
want; swallow so we can still assert on the
+ // counter.
+ }
+ return null;
+ });
+
+ Future<?> cancelFuture = executor.submit(() -> {
+ try {
+ barrier.await();
+ mailbox.cancel(new RuntimeException("race-test"));
+ } catch (Exception e) {
+ // ignore: cancel is defensive and may catch exceptions from a
closed observer
+ }
+ return null;
+ });
+
+ senderFuture.get(10, TimeUnit.SECONDS);
+ cancelFuture.get(10, TimeUnit.SECONDS);
+
+ assertEquals(mailbox.getContentObserverCalls(), 1,
+ "Iteration " + i + ": expected exactly one content observer to be
opened, but got "
+ + mailbox.getContentObserverCalls()
+ + ". Two streams indicates a lazy-init race between send() and
cancel().");
+ }
+ } finally {
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor
did not terminate in time");
+ }
+ }
+
+ /// Regression test pinning the close()-on-never-sent-mailbox no-op contract.
+ ///
+ /// Before commit aeaacc893d ("Fix close() leaving the gRPC sender stream
half-open") the close() path had an
+ /// `if (_contentObserver != null)` short-circuit, so a mailbox that was
constructed, never had `send`/`cancel`
+ /// called on it, and then was closed was a silent no-op on the wire.
aeaacc893d replaced that short-circuit
+ /// with `ensureContentObserverInitialized()`, which opened a fresh gRPC
stream just to push an error EOS and
+ /// half-close — three round-trips on a stream that never needed to exist.
For query stages pruned before any
+ /// data flows that is wasted I/O and a new exception surface (channel open
can throw at shutdown).
+ ///
+ /// This test reproduces the regression: it constructs a
[CountingGrpcSendingMailbox], skips `send`/`cancel`,
+ /// calls `close()`, and asserts the content-observer counter stayed at 0.
On the regressed code the counter
+ /// goes to 1 because `ensureContentObserverInitialized()` opens a stream.
+ ///
+ /// The cancel() path intentionally keeps the eager-open behaviour — there
may be a receiver-side reader blocked
+ /// on this stream waiting for the cancel signal — so this test is scoped to
close() only.
+ @Test
+ public void closeOnNeverSentMailboxDoesNotOpenStream()
+ throws Exception {
+ ChannelManager channelManager = Mockito.mock(ChannelManager.class);
+ CountingGrpcSendingMailbox mailbox = new
CountingGrpcSendingMailbox("close-no-op", channelManager);
+
+ mailbox.close();
+
+ assertEquals(mailbox.getContentObserverCalls(), 0,
+ "close() on a never-sent mailbox must be a silent no-op — opening a
gRPC stream just to half-close it "
+ + "wastes round-trips and introduces a new exception surface at
shutdown.");
+ Mockito.verifyNoInteractions(channelManager);
+ }
+
+ /// Test subclass: counts how many times [#getContentObserver] is called and
returns a Mockito stub that mimics a
+ /// healthy stream (isReady() → true, onNext / onCompleted / cancel are
no-ops). The real gRPC machinery is never
+ /// touched, so the only thing that controls observer-opening is the
lazy-init logic inside [GrpcSendingMailbox].
+ private static final class CountingGrpcSendingMailbox extends
GrpcSendingMailbox {
+ private final AtomicInteger _getContentObserverCalls = new AtomicInteger();
+
+ CountingGrpcSendingMailbox(String id, ChannelManager channelManager) {
+ // backpressureEnabled = false so awaitReady short-circuits without
touching the observer's isReady() — we want
+ // the test to exercise the lazy-init race specifically, not the
back-pressure gate.
+ super(id, channelManager, "localhost", 0, Long.MAX_VALUE,
+ new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024,
false);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ ClientCallStreamObserver<MailboxContent> getContentObserver() {
+ _getContentObserverCalls.incrementAndGet();
+ ClientCallStreamObserver<MailboxContent> observer =
Mockito.mock(ClientCallStreamObserver.class);
+ Mockito.when(observer.isReady()).thenReturn(true);
+ return observer;
+ }
+
+ int getContentObserverCalls() {
+ return _getContentObserverCalls.get();
+ }
+ }
+
+ /// Regression test for the EOS-vs-cancel race that throws
`IllegalStateException("call already half-closed")`.
+ ///
+ /// Before the fix, two interleavings could escape [GrpcSendingMailbox]:
+ /// * cancel acquires `_readyLock` first, pushes its error EOS payload +
`onCompleted()`, then `send(Eos)` reaches
+ /// its outer half-close in `send(MseBlock.Eos)` and calls
`_contentObserver.onCompleted()` on the now
+ /// half-closed stream — `ClientCallStreamObserver` raises
`IllegalStateException`.
+ /// * cancel completes fully (payload + `onCompleted`) before `send(Eos)`'s
`sendContent` even acquires the lock;
+ /// `sendContent` skips its in-lock `isTerminated()` re-check because
`bypassReady=true` and calls
+ /// `_contentObserver.onNext(content)` on the half-closed stream — same
`IllegalStateException`.
+ ///
+ /// The test uses a [HalfCloseEnforcingMailbox] whose observer mimics the
gRPC contract: once `onCompleted()` has
+ /// been observed, subsequent `onNext()` / `onCompleted()` raise
`IllegalStateException`. Two threads — one calling
+ /// `send(SuccessMseBlock)` and one calling `cancel(...)` — are released
through a [CyclicBarrier] each iteration.
+ /// Across many iterations, both interleavings show up.
+ @Test
+ public void concurrentSendEosAndCancelDoesNotThrow()
+ throws Exception {
+ int iterations = 200;
+ ChannelManager channelManager = Mockito.mock(ChannelManager.class);
+
+ ExecutorService executor = Executors.newFixedThreadPool(2);
+ try {
+ for (int i = 0; i < iterations; i++) {
+ HalfCloseEnforcingMailbox mailbox = new
HalfCloseEnforcingMailbox("eos-race-" + i, channelManager);
+ CyclicBarrier barrier = new CyclicBarrier(2);
+
+ Future<Throwable> senderFuture = executor.submit(() -> {
+ Throwable thrown = null;
+ try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+ barrier.await();
+ mailbox.send(SuccessMseBlock.INSTANCE, List.of());
+ } catch (Throwable t) {
+ thrown = t;
+ }
+ return thrown;
+ });
+
+ Future<Throwable> cancelFuture = executor.submit(() -> {
+ Throwable thrown = null;
+ try {
+ barrier.await();
+ mailbox.cancel(new RuntimeException("eos-race-test"));
+ } catch (Throwable t) {
+ thrown = t;
+ }
+ return thrown;
+ });
+
+ Throwable senderError = senderFuture.get(10, TimeUnit.SECONDS);
+ Throwable cancelError = cancelFuture.get(10, TimeUnit.SECONDS);
+
+ Assert.assertNull(senderError, "Iteration " + i + ": send(Eos) threw "
+ + (senderError != null ? senderError.toString() : "null")
+ + ". The EOS path must swallow IllegalStateException from a racing
cancel.");
+ Assert.assertNull(cancelError, "Iteration " + i + ": cancel() threw "
+ + (cancelError != null ? cancelError.toString() : "null"));
+ assertTrue(mailbox.isTerminated(),
+ "Iteration " + i + ": mailbox should be terminated after send(Eos)
+ cancel");
+ }
+ } finally {
+ executor.shutdownNow();
+ assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor
did not terminate in time");
+ }
+ }
+
+ /// Test subclass whose stub observer enforces the real gRPC half-close
contract: once `onCompleted()` is observed,
+ /// subsequent `onNext()` / `onCompleted()` raise
`IllegalStateException("call already half-closed")`. That's the
+ /// exact escape path Fix 1 prevents — without it, `send(Eos)` and `cancel`
racing would propagate this exception
+ /// out of `GrpcSendingMailbox`.
+ private static final class HalfCloseEnforcingMailbox extends
GrpcSendingMailbox {
+ HalfCloseEnforcingMailbox(String id, ChannelManager channelManager) {
+ // backpressureEnabled = false so awaitReady short-circuits without
spinning on isReady(); we want to exercise
+ // the half-close ordering, not the back-pressure gate.
+ super(id, channelManager, "localhost", 0, Long.MAX_VALUE,
+ new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024,
false);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ ClientCallStreamObserver<MailboxContent> getContentObserver() {
+ ClientCallStreamObserver<MailboxContent> observer =
Mockito.mock(ClientCallStreamObserver.class);
+ AtomicBoolean halfClosed = new AtomicBoolean();
+ Mockito.when(observer.isReady()).thenReturn(true);
+ Mockito.doAnswer(invocation -> {
+ if (halfClosed.get()) {
+ throw new IllegalStateException("call already half-closed");
+ }
+ return null;
+ }).when(observer).onNext(Mockito.any());
+ Mockito.doAnswer(invocation -> {
+ if (!halfClosed.compareAndSet(false, true)) {
+ throw new IllegalStateException("call already half-closed");
+ }
+ return null;
+ }).when(observer).onCompleted();
+ return observer;
+ }
+ }
+
@Test(dataProvider = "byteBuffersDataProvider")
public void testByteBuffersToByteStrings(int[] byteBufferSizes, int
maxByteStringSize) {
List<ByteBuffer> input =
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
index 1ba1da8bc8f..11faa327dc1 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Field;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
@@ -38,14 +39,18 @@ public class ChannelManagerTest {
@Test
public void testResetConnectBackoffNoOpForUnknownChannel() {
- ChannelManager channelManager = new ChannelManager(null, 4_000_000,
Duration.ofDays(365));
+ ChannelManager channelManager = new ChannelManager(null, 4_000_000,
Duration.ofDays(365),
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
// Should return false and not throw when no channel exists for the given
host/port
assertFalse(channelManager.resetConnectBackoff("unknown-host", 12345));
}
@Test
public void testResetConnectBackoffNoOpWhenNotInTransientFailure() {
- ChannelManager channelManager = new ChannelManager(null, 4_000_000,
Duration.ofDays(365));
+ ChannelManager channelManager = new ChannelManager(null, 4_000_000,
Duration.ofDays(365),
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
// Create a channel by calling getChannel
ManagedChannel channel = channelManager.getChannel("localhost", 12345);
try {
@@ -63,7 +68,9 @@ public class ChannelManagerTest {
@SuppressWarnings("unchecked")
public void testResetConnectBackoffResetsWhenInTransientFailure()
throws Exception {
- ChannelManager channelManager = new ChannelManager(null, 4_000_000,
Duration.ofDays(365));
+ ChannelManager channelManager = new ChannelManager(null, 4_000_000,
Duration.ofDays(365),
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
ManagedChannel mockChannel = mock(ManagedChannel.class);
when(mockChannel.getState(false)).thenReturn(ConnectivityState.TRANSIENT_FAILURE);
@@ -78,4 +85,26 @@ public class ChannelManagerTest {
assertTrue(channelManager.resetConnectBackoff("failing-host", 9999));
verify(mockChannel).resetConnectBackoff();
}
+
+ /// Pins the fail-fast gate added in commit 1d29438dc0 ("Fail fast on
invalid gRPC mailbox transport
+ /// configuration"): a non-positive `writeBufferLowWaterMarkBytes` must
throw at startup rather than
+ /// surfacing later as a Netty `WriteBufferWaterMark` constructor failure on
the first send.
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = ".*writeBufferLowWaterMarkBytes must
be positive.*")
+ public void testConstructorRejectsZeroWriteBufferLowWaterMark() {
+ new ChannelManager(null, 4_000_000, Duration.ofDays(365),
+
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+ 0);
+ }
+
+ /// Pins the eager `new WriteBufferWaterMark(low, high)` invariant: when
`low > high`, Netty's own
+ /// constructor throws `IllegalArgumentException`. Constructing the
watermark eagerly in
+ /// `ChannelManager` (added in 1d29438dc0) is what makes this surface at
startup instead of on the
+ /// first send to a previously-unseen peer.
+ @Test(expectedExceptions = IllegalArgumentException.class)
+ public void testConstructorRejectsLowWatermarkAboveHighWatermark() {
+ new ChannelManager(null, 4_000_000, Duration.ofDays(365),
+ 32 * 1024 * 1024, // high
+ 64 * 1024 * 1024); // low > high
+ }
}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServerValidationTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServerValidationTest.java
new file mode 100644
index 00000000000..307b2da343a
--- /dev/null
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServerValidationTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
+import org.testng.annotations.Test;
+
+
+/// Negative-path tests for the fail-fast startup gates added to {@link
GrpcMailboxServer} in commit
+/// 1d29438dc0 ("Fail fast on invalid gRPC mailbox transport configuration").
Each test wires the
+/// corresponding bad configuration value through a {@link MailboxService};
the underlying
+/// {@code Preconditions.checkArgument} fires inside the {@code
GrpcMailboxServer} constructor that
+/// {@link MailboxService#start()} invokes, so the failure surfaces during
{@code start()} — which is
+/// the whole point of fail-at-startup.
+public class GrpcMailboxServerValidationTest {
+
+ /// Pins the gate {@code _inboundMessageCredit > 0} in
+ /// {@link GrpcMailboxServer}: a zero credit value makes the manual inbound
flow-control prefetch a no-op
+ /// (the server never calls {@code request(...)} with a positive amount) so
the stream would stall
+ /// immediately. The startup gate ensures this misconfiguration is rejected
at boot rather than producing
+ /// silent hangs on the first query.
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = ".*inbound.message.credit.*must be
positive.*")
+ public void testStartRejectsZeroInboundMessageCredit() {
+ PinotConfiguration config = new PinotConfiguration(Map.of(
+ MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT, 0));
+ MailboxService mailboxService = new MailboxService(
+ "localhost", QueryTestUtils.getAvailablePort(), InstanceType.BROKER,
config);
+ // start() builds the GrpcMailboxServer; the precondition fires inside
that constructor and propagates
+ // out — no shutdown needed because the server never finished starting.
+ mailboxService.start();
+ }
+
+ /// Pins the gate {@code _flowControlWindowBytes >= maxInboundMessageSize} in
+ /// {@link GrpcMailboxServer}: a window smaller than the largest possible
single message produces a
+ /// pathological stream where {@code isReady()} flaps on every message
because no single message can
+ /// ever fit in the available credit. Verifies that a 1 KiB flow-control
window combined with the
+ /// default 16 MiB max inbound message size is rejected at startup.
+ @Test(expectedExceptions = IllegalArgumentException.class,
+ expectedExceptionsMessageRegExp = ".*flow.control.window.*must be
>=.*max.msg.size.*")
+ public void testStartRejectsFlowControlWindowSmallerThanMaxMessageSize() {
+ PinotConfiguration config = new PinotConfiguration(Map.of(
+ MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 1024));
+ // Leave KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES at its default (16
MiB) so the gate is
+ // exercised purely by the too-small flow-control window.
+ MailboxService mailboxService = new MailboxService(
+ "localhost", QueryTestUtils.getAvailablePort(), InstanceType.BROKER,
config);
+ mailboxService.start();
+ }
+}
diff --git
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
index 7414195c63f..866e2592fc2 100644
---
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
+++
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
@@ -19,7 +19,7 @@
package org.apache.pinot.query.mailbox.channel;
import com.google.protobuf.ByteString;
-import io.grpc.stub.StreamObserver;
+import io.grpc.stub.ServerCallStreamObserver;
import org.apache.pinot.common.proto.Mailbox.MailboxContent;
import org.apache.pinot.common.proto.Mailbox.MailboxStatus;
import org.apache.pinot.query.mailbox.MailboxService;
@@ -38,8 +38,10 @@ public class MailboxContentObserverTest {
MailboxService mailboxService = mock(MailboxService.class);
ReceivingMailbox receivingMailbox = mock(ReceivingMailbox.class);
when(mailboxService.getReceivingMailbox(TEST_MAILBOX_ID)).thenReturn(receivingMailbox);
- StreamObserver<MailboxStatus> mockStatusObserver =
mock(StreamObserver.class);
- MailboxContentObserver observer = new
MailboxContentObserver(mailboxService, TEST_MAILBOX_ID, mockStatusObserver);
+ @SuppressWarnings("unchecked")
+ ServerCallStreamObserver<MailboxStatus> mockStatusObserver =
mock(ServerCallStreamObserver.class);
+ MailboxContentObserver observer =
+ new MailboxContentObserver(mailboxService, TEST_MAILBOX_ID,
mockStatusObserver, true);
verify(mailboxService, times(1)).getReceivingMailbox(TEST_MAILBOX_ID);
// Now simulate receiving a mailbox content message
@@ -52,4 +54,56 @@ public class MailboxContentObserverTest {
verify(mailboxService, times(1)).getReceivingMailbox(TEST_MAILBOX_ID);
verifyNoMoreInteractions(mailboxService);
}
+
+ @Test
+ public void testOnNextReplenishesInboundCredit() {
+ MailboxService mailboxService = mock(MailboxService.class);
+ ReceivingMailbox receivingMailbox = mock(ReceivingMailbox.class);
+
when(mailboxService.getReceivingMailbox(TEST_MAILBOX_ID)).thenReturn(receivingMailbox);
+ @SuppressWarnings("unchecked")
+ ServerCallStreamObserver<MailboxStatus> mockStatusObserver =
mock(ServerCallStreamObserver.class);
+ MailboxContentObserver observer =
+ new MailboxContentObserver(mailboxService, TEST_MAILBOX_ID,
mockStatusObserver, true);
+
+ // Each onNext call should request(1) exactly once, regardless of whether
the stream is already closed.
+ MailboxContent mockContent = mock(MailboxContent.class);
+ when(mockContent.getPayload()).thenReturn(ByteString.copyFrom(new
byte[0]));
+ when(mockContent.getMailboxId()).thenReturn(TEST_MAILBOX_ID);
+
+ observer.onNext(mockContent);
+ verify(mockStatusObserver, times(1)).request(1);
+
+ observer.onNext(mockContent);
+ verify(mockStatusObserver, times(2)).request(1);
+ }
+
+ /// Asserts the manual-inbound-flow-control kill-switch
+ /// (`pinot.query.runner.grpc.manual.inbound.flow.control.enabled` =
`false`) really removes the
+ /// `request(1)` replenishment from the top of `onNext`. With the flag off,
gRPC's auto-inbound is in
+ /// place and will replenish 1 credit after `onNext` returns; calling
`request(1)` here too would
+ /// double-count and defeat the rollback.
+ ///
+ /// If someone removes the `if (_manualInboundFlowControlEnabled)` guard,
this test fails and surfaces
+ /// the regression in CI.
+ @Test
+ public void testOnNextDoesNotReplenishCreditWhenManualFlowControlDisabled() {
+ MailboxService mailboxService = mock(MailboxService.class);
+ ReceivingMailbox receivingMailbox = mock(ReceivingMailbox.class);
+
when(mailboxService.getReceivingMailbox(TEST_MAILBOX_ID)).thenReturn(receivingMailbox);
+ @SuppressWarnings("unchecked")
+ ServerCallStreamObserver<MailboxStatus> mockStatusObserver =
mock(ServerCallStreamObserver.class);
+ MailboxContentObserver observer =
+ new MailboxContentObserver(mailboxService, TEST_MAILBOX_ID,
mockStatusObserver, false);
+
+ MailboxContent mockContent = mock(MailboxContent.class);
+ when(mockContent.getPayload()).thenReturn(ByteString.copyFrom(new
byte[0]));
+ when(mockContent.getMailboxId()).thenReturn(TEST_MAILBOX_ID);
+
+ observer.onNext(mockContent);
+ observer.onNext(mockContent);
+
+ // With manual flow control disabled, onNext must NOT call request(1) —
gRPC's auto-inbound machinery
+ // handles credit replenishment after onNext returns.
+ verify(mockStatusObserver, never()).request(anyInt());
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5db5055117e..840dd97db51 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2335,6 +2335,139 @@ public class CommonConstants {
public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
= "pinot.query.runner.max.msg.size.bytes";
public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES =
16 * 1024 * 1024;
+ /// Whether the sender side of every `GrpcSendingMailbox` respects gRPC
client-side flow control by waiting
+ /// on [io.grpc.stub.ClientCallStreamObserver#isReady] before pushing each
chunk.
+ ///
+ /// Default `false` — the gate is **opt-in**. When `false`, the sender
pushes unconditionally and the
+ /// behaviour is identical to the pre-PR-#18519 unbounded path. Set to
`true` to engage the
+ /// `isReady()`-gated wait that bounds the gRPC client allocator against
the `OutOfDirectMemoryError`
+ /// failure mode described in #18519. Operators who hit that OOM (slow
consumer / large fan-out / skewed
+ /// shuffle) should flip this on.
+ ///
+ /// Also used as an A/B knob for benchmarks (see
`BenchmarkGrpcMailboxSend`).
+ public static final String KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED =
+ "pinot.query.runner.grpc.sender.backpressure.enabled";
+ public static final boolean DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED =
false;
+
+ /// Per-stream HTTP/2 flow control window, in bytes. The receiver
advertises this value to the sender as
+ /// the number of bytes it will accept before requiring a `WINDOW_UPDATE`
frame. Wider windows let the
+ /// sender push a whole `MseBlock` without
[io.grpc.stub.ClientCallStreamObserver#isReady] flipping
+ /// mid-block. Applied via `NettyServerBuilder.flowControlWindow` in
`GrpcMailboxServer`.
+ ///
+ /// This is per HTTP/2 stream, so total inbound buffering at the receiver
scales as
+ /// `value × #concurrent streams to this server`. Concretely:
+ /// `Peak receiver direct memory ≈ flowControlWindow ×
#concurrent_incoming_streams.`
+ ///
+ /// This value is the **per-stalled-stream receiver-side direct-memory
exposure**, not just a throughput
+ /// knob: when an inbound stream's receiver application queue stalls (e.g.
the downstream operator is slow
+ /// to drain via
[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]), the
wire can
+ /// still buffer up to `flowControlWindow` bytes of data on that stream
before the HTTP/2 peer stops
+ /// sending.
+ ///
+ /// This is a direct-memory bound, not just a throughput knob: operators
must size it against
+ /// `-XX:MaxDirectMemorySize` given the expected concurrent inbound stream
count.
+ ///
+ /// Receiver-side counterpart to
[#KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES] (the sender-side
+ /// outbound queue cap). The two are aligned at the same default by design
— they cap roughly the
+ /// same conceptual thing (one peer's worth of in-flight bytes) from the
two ends of the wire — but
+ /// kept as separate keys so operators can tune them independently for
asymmetric workloads.
+ public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
+ "pinot.query.runner.grpc.flow.control.window.bytes";
+ public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024
* 1024;
+
+ /// Netty per-channel WriteQueue high watermark, in bytes. Applied via
+ /// `ChannelOption.WRITE_BUFFER_WATER_MARK` on the sender's
`NettyChannelBuilder`. When the channel's
+ /// outbound queue exceeds this value, `Channel.isWritable()` flips to
`false` and gRPC's
+ /// [io.grpc.stub.ClientCallStreamObserver#isReady] returns `false` until
the queue drops below the low
+ /// watermark.
+ ///
+ /// This is a per-channel (per `host:port`) setting, shared across all
streams to that peer. The
+ /// sender's direct-memory footprint is therefore bounded by `value ×
#peers`, not by
+ /// `value × #streams`. Concretely:
+ /// `Peak sender direct memory ≈ writeBufferHighWaterMark × #peers (one
channel per peer, shared across
+ /// streams to that peer).`
+ ///
+ /// This is a direct-memory bound, not just a throughput knob: operators
must size it against
+ /// `-XX:MaxDirectMemorySize` given the expected per-query peer fan-out
and the number of concurrent
+ /// queries. Pairs with [#KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES].
+ ///
+ /// Sender-side counterpart to [#KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES]
(the receiver-side inbound
+ /// window). The two are aligned at the same default by design, sized
together to bound one peer's
+ /// worth of in-flight bytes from each end of the wire.
+ public static final String KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
+ "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes";
+ public static final int DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
64 * 1024 * 1024;
+
+ /// Netty per-channel WriteQueue low watermark, in bytes. Once the
WriteQueue has exceeded the high
+ /// watermark (see [#KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES] and
the
+ /// `writeBufferHighWaterMark × #peers` direct-memory formula documented
there), it must drop below this
+ /// value before `Channel.isWritable()` flips back to `true`.
Conventionally set to ~50% of the high
+ /// watermark.
+ ///
+ /// The gap `(high − low)` is the drain hysteresis the channel must clear
before becoming writable
+ /// again: setting `low` too close to `high` makes the channel flap
writable/unwritable on every
+ /// small drain; setting it too low forces the sender to wait longer
between writable windows. The
+ /// low watermark itself does not change the peak direct-memory bound —
that is set by the high
+ /// watermark — but it controls how aggressively the channel reopens once
back-pressure has engaged.
+ public static final String KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES =
+ "pinot.query.runner.grpc.write.buffer.low.water.mark.bytes";
+ public static final int DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES =
32 * 1024 * 1024;
+
+ /// Number of inbound gRPC messages the receiver will accept in flight per
stream, before requiring the
+ /// application to consume one (via
[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]
+ /// returning). Implemented by disabling gRPC's default
auto-inbound-flow-control on the server side and
+ /// calling [io.grpc.stub.ServerCallStreamObserver#request] explicitly.
Only takes effect when
+ /// [#KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED] is `true` (off by
default).
+ ///
+ /// Default `1`, which mirrors gRPC's auto-inbound-flow-control behaviour
(one message in flight). Even
+ /// when [#KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED] is flipped on,
this conservative default
+ /// keeps the in-flight window at one message until the operator
explicitly widens it.
+ ///
+ /// Larger values let the sender pipeline more messages without waiting
for per-message round trips,
+ /// which is the primary throughput knob for small / medium MSE blocks.
Memory exposure on the receiver
+ /// is still bounded by the HTTP/2 stream window (see
[#KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES]), so this
+ /// credit count is effectively a per-stream message-count limit on top of
the byte-count limit.
+ /// Whichever fires first applies.
+ ///
+ /// ## Cancel-propagation tradeoff
+ ///
+ /// Higher credit values widen the in-flight window, which improves
throughput for small/medium blocks
+ /// but also **widens worst-case cancel-propagation latency** when the
receiver's application queue
+ /// (capacity 5 by default) is stuck. The sender's
+ /// [org.apache.pinot.query.mailbox.GrpcSendingMailbox#cancel] pushes an
error EOS **in-band** on the
+ /// same gRPC stream as data; when the receiver's dispatch thread is
parked in `_notFull.await`, that
+ /// EOS sits behind every inbound message that already made it past flow
control. Worst-case cancel
+ /// latency is bounded by `min(credit messages, flowControlWindow bytes)`
worth of buffered inbound that
+ /// has to drain before the EOS reaches the application.
+ ///
+ /// Note that this hang surface is **pre-existing** — the in-band EOS path
can stall even with gRPC's
+ /// auto-inbound default of 1 in-flight message if the receiver's
application queue is permanently
+ /// stuck (e.g. the consumer is gone). The credit value just controls how
much worse the latency gets
+ /// before the hang surfaces. See
https://github.com/apache/pinot/issues/18541 for the proper
+ /// out-of-band cancel work.
+ public static final String KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT =
+ "pinot.query.runner.grpc.inbound.message.credit";
+ public static final int DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT = 1;
+
+ /// Whether the receiver overrides gRPC's auto-inbound-flow-control on the
mailbox stream and prefetches
+ /// [#KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT] messages of credit up-front,
then replenishes one credit
+ /// before each `onNext` does the (possibly blocking) hand-off to the
application queue.
+ ///
+ /// Default `false` — the manual-flow-control path is **opt-in**. When
`false` (default), the receiver
+ /// leaves gRPC's auto-inbound in place (only 1 message in flight at a
time, post-`onNext`-return credit
+ /// replenishment), which is the pre-PR-#18519 behaviour. Set to `true` to
engage the manual prefetch +
+ /// pre-`offerRaw` credit replenishment introduced in #18519, which is the
primary throughput knob for
+ /// small/medium MSE blocks.
+ ///
+ /// Cancel-propagation latency is bounded more tightly when this is
`false`, but the worst case (a stuck
+ /// receiver dispatch thread) is still possible because the sender's
cancel travels in-band; see
+ /// https://github.com/apache/pinot/issues/18541.
+ ///
+ /// This is an independent opt-in from
[#KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED]; the two control
+ /// different sides of the mailbox path.
+ public static final String KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED
=
+ "pinot.query.runner.grpc.manual.inbound.flow.control.enabled";
+ public static final boolean
DEFAULT_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED = false;
/**
* Configuration for channel idle timeout in seconds.
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]