This is an automated email from the ASF dual-hosted git repository.

gortiz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new f00dc04af11 [multistage] Sender-side gRPC back-pressure for mailbox + 
MAILBOX_CLIENT_USED_* gauges (#18519)
f00dc04af11 is described below

commit f00dc04af111f4ba4e5ef7c92e93c0eef1dc3082
Author: Gonzalo Ortiz Jaureguizar <[email protected]>
AuthorDate: Wed May 27 18:13:32 2026 +0200

    [multistage] Sender-side gRPC back-pressure for mailbox + 
MAILBOX_CLIENT_USED_* gauges (#18519)
---
 .../apache/pinot/common/metrics/BrokerGauge.java   |   4 +
 .../apache/pinot/common/metrics/ServerGauge.java   |   4 +
 .../pinot/perf/BenchmarkGrpcMailboxSend.java       | 263 ++++++++++++++
 .../pinot/query/mailbox/GrpcSendingMailbox.java    | 376 +++++++++++++++++++--
 .../apache/pinot/query/mailbox/MailboxService.java | 101 +++++-
 .../query/mailbox/channel/ChannelManager.java      |  38 ++-
 .../query/mailbox/channel/GrpcMailboxServer.java   |  68 +++-
 .../mailbox/channel/MailboxContentObserver.java    |  19 +-
 .../mailbox/GrpcSenderBackpressureOffPathTest.java | 187 ++++++++++
 .../query/mailbox/GrpcSenderBackpressureTest.java  | 251 ++++++++++++++
 .../GrpcSenderBackpressureTightGateTest.java       | 220 ++++++++++++
 .../query/mailbox/GrpcSendingMailboxTest.java      | 230 ++++++++++++-
 .../query/mailbox/channel/ChannelManagerTest.java  |  35 +-
 .../channel/GrpcMailboxServerValidationTest.java   |  71 ++++
 .../channel/MailboxContentObserverTest.java        |  60 +++-
 .../apache/pinot/spi/utils/CommonConstants.java    | 133 ++++++++
 16 files changed, 2016 insertions(+), 44 deletions(-)

diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
index 3d5b2ae9366..b730aa0e9d5 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/BrokerGauge.java
@@ -106,6 +106,10 @@ public enum BrokerGauge implements AbstractMetrics.Gauge {
   MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
   MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
 
+  // MailboxService gRPC client (outbound to peer mailboxes) memory metrics
+  MAILBOX_CLIENT_USED_DIRECT_MEMORY("bytes", true),
+  MAILBOX_CLIENT_USED_HEAP_MEMORY("bytes", true),
+
   /// Exports the max amount of direct memory that can be allocated by the 
shaded Netty code used by gRPC
   /// It is basically an adaptor for 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.maxDirectMemory()
   ///
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index c34990d0a23..c2dba897aec 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -126,6 +126,10 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
   MAILBOX_SERVER_THREADLOCALCACHE("bytes", true),
   MAILBOX_SERVER_CHUNK_SIZE("bytes", true),
 
+  // MailboxService gRPC client (outbound to peer mailboxes) memory metrics
+  MAILBOX_CLIENT_USED_DIRECT_MEMORY("bytes", true),
+  MAILBOX_CLIENT_USED_HEAP_MEMORY("bytes", true),
+
   /// Exports the max amount of direct memory that can be allocated by Netty
   /// It is basically an adaptor for 
io.netty.util.internal.PlatformDependent.maxDirectMemory()
   ///
diff --git 
a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java
new file mode 100644
index 00000000000..ff4bff51577
--- /dev/null
+++ 
b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/// Benchmark for the gRPC sender → receiver MSE mailbox path.
+///
+/// One @Benchmark invocation sends a fixed total payload (`TOTAL_BYTES = 128 
MiB`) split into
+/// pre-computed `MseBlock.Data` blocks of size `_blockSizeBytes`, and waits 
until every block has
+/// been polled out of the receiver mailbox. Measured time is therefore 
end-to-end "ship a request
+/// of this size and process it on the other side" — closer to the unit Pinot 
actually cares about
+/// than throughput of one-block-at-a-time hot-loop send.
+///
+/// Axes:
+///
+///  * `_blockSizeBytes` — three representative block sizes:
+///      - `8 KiB`   → 16384 blocks per invocation; sender is dominated by 
per-block overhead.
+///      - `8 MiB`   → 16 blocks; one block fits in a single gRPC chunk
+///        (`maxInboundMessageSize / 2 ≈ 8 MiB`).
+///      - `32 MiB`  → 4 blocks; each block is split by `toByteStrings` into 
~4 gRPC chunks.
+///  * `_backpressureEnabled` — toggles the `GrpcSendingMailbox` 
`isReady()`-gate. `false` reverts
+///    to the pre-fix unconditional `onNext` (kill-switch).
+///  * `_flowControlWindowBytes` — HTTP/2 per-stream inbound window the 
receiver advertises
+///    (`pinot.query.runner.grpc.flow.control.window.bytes`). Sweeps `{64 KiB, 
1 MiB, 64 MiB}`,
+///    spanning the historical default, BDP-estimated LAN value, and the new 
MB-scale default.
+///
+/// The drainer runs as a separate thread (modelling the cross-operator 
boundary in a real query),
+/// blocks on a `Semaphore` released by the receiver's `Reader` callback, and 
counts down a
+/// per-invocation `CountDownLatch` for each polled block. The @Benchmark 
thread sends every block
+/// then awaits the latch.
+///
+/// Run with `org.openjdk.jmh.Main 
org.apache.pinot.perf.BenchmarkGrpcMailboxSend`.
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(value = 1, jvmArgsAppend = {
+    "-Xms2g", "-Xmx4g",
+    "-XX:MaxDirectMemorySize=4g"
+})
+@Warmup(iterations = 2, time = 5)
+@Measurement(iterations = 3, time = 10)
+@State(Scope.Benchmark)
+public class BenchmarkGrpcMailboxSend {
+
+  /// Total bytes shipped per @Benchmark invocation. Sized so that even at 32 
MiB-per-block we
+  /// get a handful of blocks to bound the per-invocation latency.
+  private static final long TOTAL_BYTES = 128L * 1024 * 1024;
+
+  private static final DataSchema SCHEMA = new DataSchema(
+      new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+
+  /// Approximate bytes per `MseBlock.Data` payload. Each block carries a 
single String column,
+  /// one row, where the String is roughly `_blockSizeBytes` characters (ASCII 
→ 1 byte per char
+  /// when serialised on the wire, ~2 bytes on heap). The number of blocks per 
invocation is
+  /// `ceil(TOTAL_BYTES / _blockSizeBytes)`.
+  @Param({"8192", "8388608", "33554432"})
+  public int _blockSizeBytes;
+
+  /// `true` exercises the sender-side `isReady()`-gate; `false` restores the 
pre-fix
+  /// unconditional `onNext` (kill-switch). Wired via
+  /// `pinot.query.runner.grpc.sender.backpressure.enabled`.
+  @Param({"true", "false"})
+  public boolean _backpressureEnabled;
+
+  /// HTTP/2 per-stream flow-control window the receiver advertises, in bytes
+  /// (`pinot.query.runner.grpc.flow.control.window.bytes`):
+  ///  * `65535`    (~64 KiB) — historical gRPC default; gate engages 
frequently.
+  ///  * `1048576`  (~1 MiB)  — typical BDP-estimated LAN value.
+  ///  * `67108864` (64 MiB)  — Pinot's new default; gate rarely engages under 
typical loads.
+  @Param({"65535", "1048576", "67108864"})
+  public int _flowControlWindowBytes;
+
+  private MailboxService _senderService;
+  private MailboxService _receiverService;
+  private SendingMailbox _sender;
+  private ReceivingMailbox _receiver;
+  private List<RowHeapDataBlock> _blocks;
+
+  // Drainer signalling.
+  private final Semaphore _readSignal = new Semaphore(0);
+  private final AtomicBoolean _stop = new AtomicBoolean();
+  private Thread _drainer;
+  private volatile CountDownLatch _drainedLatch;
+
+  @Setup
+  public void setup()
+      throws IOException {
+    // The GrpcMailboxServer fail-fast validation requires flowControlWindow 
>= maxInboundMessageSize. The narrow window
+    // axis values (65535, 1 MiB) are smaller than the 16 MiB default for 
max-inbound-message-size, so we also clamp the
+    // max-msg-size down to the window. Math.min keeps the 64 MiB-window cell 
using the production default 16 MiB
+    // max-msg-size; the two narrow-window cells pin max-msg-size down to the 
window so the validation passes.
+    // Manual inbound flow control defaults to off; this bench specifically 
measures the feature-enabled
+    // throughput across window sizes, so we explicitly opt in and set the 
credit to 128 (the original
+    // "as designed" value before the production default was switched to 1).
+    PinotConfiguration cfg = new PinotConfiguration(Map.of(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, 
_backpressureEnabled,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
 true,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT, 128,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 
_flowControlWindowBytes,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+            Math.min(_flowControlWindowBytes,
+                
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES)));
+    _senderService = new MailboxService("localhost", availablePort(), 
InstanceType.SERVER, cfg);
+    _senderService.start();
+    _receiverService = new MailboxService("localhost", availablePort(), 
InstanceType.SERVER, cfg);
+    _receiverService.start();
+
+    // Inlined MailboxIdUtils.toMailboxId(1L, 1, 0, 0, 0) format
+    // (requestId_senderStage_senderWorker_recvStage_recvWorker).
+    String mailboxId = "1_1_0_0_0";
+    long deadlineMs = System.currentTimeMillis() + TimeUnit.HOURS.toMillis(1);
+    _sender = _senderService.getSendingMailbox("localhost", 
_receiverService.getPort(),
+        mailboxId, deadlineMs, new 
StatMap<>(MailboxSendOperator.StatKey.class));
+    _receiver = _receiverService.getReceivingMailbox(mailboxId);
+    // Reader callback fires once per `offer` — wake the drainer.
+    _receiver.registeredReader(_readSignal::release);
+
+    // Pre-compute the block list. All blocks share the same payload String 
(immutable, so
+    // sharing is safe and keeps heap usage bounded). The serialiser still 
emits independent
+    // byte buffers per send.
+    int numBlocks = (int) ((TOTAL_BYTES + _blockSizeBytes - 1) / 
_blockSizeBytes);
+    char[] chars = new char[_blockSizeBytes];
+    Arrays.fill(chars, 'x');
+    String payload = new String(chars);
+    _blocks = new ArrayList<>(numBlocks);
+    for (int i = 0; i < numBlocks; i++) {
+      _blocks.add(new RowHeapDataBlock(
+          Collections.singletonList(new Object[]{payload}), SCHEMA));
+    }
+
+    // Drainer: block on the reader signal, then drain everything available 
with non-blocking
+    // poll. The semaphore may accumulate spurious permits (we poll more 
aggressively than the
+    // reader fires) but those just cause a no-op wakeup later, which is 
harmless.
+    _drainer = new Thread(() -> {
+      while (!_stop.get()) {
+        try {
+          _readSignal.acquire();
+        } catch (InterruptedException e) {
+          return;
+        }
+        while (_receiver.poll() != null) {
+          CountDownLatch latch = _drainedLatch;
+          if (latch != null) {
+            latch.countDown();
+          }
+        }
+      }
+    }, "bench-grpc-mailbox-drainer");
+    _drainer.setDaemon(true);
+    _drainer.start();
+  }
+
+  private static int availablePort()
+      throws IOException {
+    try (ServerSocket socket = new ServerSocket(0)) {
+      return socket.getLocalPort();
+    }
+  }
+
+  @TearDown
+  public void teardown()
+      throws InterruptedException {
+    try {
+      _sender.cancel(new RuntimeException("bench done"));
+    } catch (Exception ignored) {
+      // best-effort
+    }
+    _stop.set(true);
+    _readSignal.release();
+    _drainer.join(TimeUnit.SECONDS.toMillis(10));
+    _senderService.shutdown();
+    _receiverService.shutdown();
+  }
+
+  /// `GrpcSendingMailbox.send` calls 
`QueryThreadContext.checkTerminationAndSampleUsage`, which
+  /// requires a context to be open on the calling thread. Opening it as a 
thread-scoped @State
+  /// ensures the JMH worker thread has one open across the iteration.
+  @State(Scope.Thread)
+  public static class ThreadCtx {
+    private QueryThreadContext _ctx;
+
+    @Setup
+    public void setup() {
+      _ctx = QueryThreadContext.openForMseTest();
+    }
+
+    @TearDown
+    public void tearDown() {
+      _ctx.close();
+    }
+  }
+
+  @Benchmark
+  public void sendRequest(ThreadCtx ignored)
+      throws InterruptedException {
+    CountDownLatch latch = new CountDownLatch(_blocks.size());
+    _drainedLatch = latch;
+    for (RowHeapDataBlock block : _blocks) {
+      _sender.send(block);
+    }
+    latch.await();
+  }
+
+  public static void main(String[] args)
+      throws Exception {
+    new Runner(new OptionsBuilder()
+        .include(BenchmarkGrpcMailboxSend.class.getSimpleName())
+        .build()).run();
+  }
+}
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
index 75db17a105e..6079f72b9e7 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java
@@ -22,8 +22,9 @@ import com.google.common.collect.Maps;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.UnsafeByteOperations;
 import io.grpc.Metadata;
+import io.grpc.stub.ClientCallStreamObserver;
+import io.grpc.stub.ClientResponseObserver;
 import io.grpc.stub.MetadataUtils;
-import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -32,10 +33,13 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.MetadataBlock;
 import org.apache.pinot.common.datatable.StatMap;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
+import org.apache.pinot.common.proto.Mailbox.MailboxStatus;
 import org.apache.pinot.common.proto.PinotMailboxGrpc;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.mailbox.channel.ChannelUtils;
@@ -56,6 +60,10 @@ import org.slf4j.LoggerFactory;
 
 /**
  * gRPC implementation of the {@link SendingMailbox}. The gRPC stream is 
created on the first call to {@link #send}.
+ *
+ * <p>Thread-safety: {@code _readyLock} serializes every call to {@code 
_contentObserver.onNext} / {@code onCompleted}.
+ * The underlying {@link ClientCallStreamObserver} is not thread-safe, and the 
sender, cancel, and EOS paths can run
+ * concurrently. Acquire {@code _readyLock} around any new outbound call site 
you add.
  */
 public class GrpcSendingMailbox implements SendingMailbox {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcSendingMailbox.class);
@@ -70,19 +78,36 @@ public class GrpcSendingMailbox implements SendingMailbox {
   private final StatMap<MailboxSendOperator.StatKey> _statMap;
   private final MailboxStatusObserver _statusObserver = new 
MailboxStatusObserver();
   private final int _maxByteStringSize;
+  /// Kill-switch for the sender-side `isReady()` gate. When `false`, 
`awaitReady` short-circuits like the bypass
+  /// path and the sender pushes unconditionally — restoring the pre-1.6 
behaviour. Plumbed from
+  /// `pinot.query.runner.grpc.sender.backpressure.enabled` so it can be 
flipped without code changes if the gate
+  /// causes a regression in production, and also used by 
`BenchmarkGrpcMailboxSend` for A/B measurements.
+  private final boolean _backpressureEnabled;
   /// Indicates whether the sending side has attempted to close the mailbox 
(either via complete() or cancel()).
   private volatile boolean _senderSideClosed;
 
-  private StreamObserver<MailboxContent> _contentObserver;
+  /// Guards [#_readyCond]. [#_contentObserver] is normally written once by 
the sending thread on its first call to
+  /// [#sendInternal]. The field is declared `volatile` because [#cancel] and 
[#close] can read it from a different
+  /// thread (e.g. an external cancel from an OpChain on-failure callback or a 
watchdog in tests), and we need a
+  /// happens-before edge for the sender's lazy initialization.
+  private final ReentrantLock _readyLock = new ReentrantLock();
+  /// Signalled whenever any of the predicates `awaitReady()` waits on may 
have changed: the gRPC outbound becomes
+  /// ready, the receiver acknowledges a chunk, the receiver-side stream 
closes (success or error), or the sender
+  /// itself is cancelled. Multiple producers fire the signal; the waiter 
always re-checks the predicates after
+  /// waking up.
+  private final Condition _readyCond = _readyLock.newCondition();
+
+  private volatile ClientCallStreamObserver<MailboxContent> _contentObserver;
 
   public GrpcSendingMailbox(String id, ChannelManager channelManager, String 
hostname, int port, long deadlineMs,
-      StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize) 
{
+      StatMap<MailboxSendOperator.StatKey> statMap, int maxInboundMessageSize, 
boolean backpressureEnabled) {
     _id = id;
     _channelManager = channelManager;
     _hostname = hostname;
     _port = port;
     _deadlineMs = deadlineMs;
     _statMap = statMap;
+    _backpressureEnabled = backpressureEnabled;
     // TODO: tune the maxByteStringSize based on experiments. We know the 
maxInboundMessageSize on the receiver side,
     //  but we want to leave some room for extra stuff for other fields like 
metadata, mailbox id, etc, whose size
     //  we don't know at the time of writing into the stream as it is 
serialized by protobuf.
@@ -102,10 +127,52 @@ public class GrpcSendingMailbox implements SendingMailbox 
{
 
   @Override
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
-    if (sendInternal(block, serializedStats)) {
-      LOGGER.debug("Completing mailbox: {}", _id);
-      _contentObserver.onCompleted();
+    // EOS blocks (success or error) carry control-plane info — including the 
original error code on the error path —
+    // and must always reach the receiver, so they bypass the back-pressure 
gate. Bypassing also disables the
+    // cooperative termination poll inside [#awaitReady]; without it, a 
terminate signal raised while the sender is
+    // mid-way through pushing an error EOS would unwind [#sendInternal] with 
a TerminationException, leave
+    // [#_senderSideClosed] false, and let [#cancel] run and overwrite the 
original error code with
+    // QUERY_CANCELLATION on the receiver side.
+    //
+    // Race against a concurrent [#cancel]: both paths perform an early 
`isTerminated()` check before taking
+    // [#_readyLock], so two threads can both pass that check and enter their 
respective lock sections. The cancel
+    // path acquires the lock first, pushes its own error EOS payload (under 
`bypassReady=true`) and calls
+    // `_contentObserver.onCompleted()` — the gRPC client stream is now 
half-closed. When this thread then enters
+    // its own [#sendInternal] -> [#processAndSend] -> [#sendContent] under 
the lock, the in-lock
+    // `isTerminated()` re-check is skipped because `bypassReady=true`, so 
`_contentObserver.onNext()` is invoked on
+    // an already-half-closed observer and raises `IllegalStateException("call 
already half-closed")`. The
+    // subsequent `_contentObserver.onCompleted()` would raise the same error. 
We wrap the whole EOS push +
+    // half-close in a `try/catch(IllegalStateException)` and treat it as a 
benign duplicate close — the cancel
+    // path has already delivered an EOS, so the receiver will still get a 
terminal signal. Matches the
+    // defensive `catch(Exception)` in [#cancel] for the symmetric edge.
+    boolean sent;
+    try {
+      sent = sendInternal(block, serializedStats, /* bypassReady */ true);
+    } catch (IllegalStateException e) {
+      // Concurrent cancel won the race and half-closed the stream while we 
were pushing the EOS payload.
+      LOGGER.debug("EOS send raced with cancel on already half-closed stream 
for mailbox: {}", _id, e);
+      // Make sure isTerminated() observes us as closed so any downstream 
caller doesn't keep trying.
       _senderSideClosed = true;
+      return;
+    }
+    if (sent) {
+      LOGGER.debug("Completing mailbox: {}", _id);
+      // _readyLock serializes outbound observer calls. Holding it across the 
flag write + onCompleted() makes the
+      // half-close atomic with respect to any racing sendContent() / 
cancel(), so two threads cannot call onNext()
+      // / onCompleted() on the same non-thread-safe ClientCallStreamObserver. 
Setting _senderSideClosed before
+      // onCompleted() also means a sendContent() that observes isTerminated() 
under the lock will skip its onNext().
+      _readyLock.lock();
+      try {
+        _senderSideClosed = true;
+        _contentObserver.onCompleted();
+      } catch (IllegalStateException e) {
+        // The cancel path may have raced through its own onCompleted() 
between our sendContent() returning and our
+        // acquiring _readyLock here (cancel takes its own lock after we 
release ours in sendContent's finally).
+        // Treat the duplicate half-close as benign — same shape as the cancel 
path's catch(Exception).
+        LOGGER.debug("EOS half-close raced with cancel on already half-closed 
stream for mailbox: {}", _id, e);
+      } finally {
+        _readyLock.unlock();
+      }
     } else {
       LOGGER.warn("Trying to send EOS to the already terminated mailbox: {}", 
_id);
     }
@@ -113,6 +180,10 @@ public class GrpcSendingMailbox implements SendingMailbox {
 
   /// Tries to send the block to the receiver. Returns true if the block is 
sent, false otherwise.
   private boolean sendInternal(MseBlock block, List<DataBuffer> 
serializedStats) {
+    return sendInternal(block, serializedStats, /* bypassReady */ false);
+  }
+
+  private boolean sendInternal(MseBlock block, List<DataBuffer> 
serializedStats, boolean bypassReady) {
     if (isTerminated() || (isEarlyTerminated() && block.isData())) {
       LOGGER.debug("==[GRPC SEND]== terminated or early terminated mailbox. 
Skipping sending message {} to: {}",
           block, _id);
@@ -121,11 +192,9 @@ public class GrpcSendingMailbox implements SendingMailbox {
     if (LOGGER.isDebugEnabled()) {
       LOGGER.debug("==[GRPC SEND]== sending message " + block + " to: " + _id);
     }
-    if (_contentObserver == null) {
-      _contentObserver = getContentObserver();
-    }
+    ensureContentObserverInitialized();
     try {
-      processAndSend(block, serializedStats);
+      processAndSend(block, serializedStats, bypassReady);
     } catch (IOException e) {
       throw new QueryException(QueryErrorCode.INTERNAL, "Failed to split and 
send mailbox", e);
     }
@@ -137,11 +206,19 @@ public class GrpcSendingMailbox implements SendingMailbox 
{
 
   private void processAndSend(MseBlock block, List<DataBuffer> serializedStats)
       throws IOException {
+    processAndSend(block, serializedStats, false);
+  }
+
+  /// Same as [#processAndSend(MseBlock, List)] but with a flag to bypass the 
[#awaitReady] gate. Used by the
+  /// cancel / close paths to push the error EOS through without waiting on 
back-pressure relief — without this,
+  /// a cancel issued while the receiver is congested would itself block until 
the receiver drains.
+  private void processAndSend(MseBlock block, List<DataBuffer> 
serializedStats, boolean bypassReady)
+      throws IOException {
     _statMap.merge(MailboxSendOperator.StatKey.RAW_MESSAGES, 1);
     long start = System.currentTimeMillis();
     try {
       DataBlock dataBlock = MseBlockSerializer.toDataBlock(block, 
serializedStats);
-      int sizeInBytes = processAndSend(dataBlock);
+      int sizeInBytes = processAndSend(dataBlock, bypassReady);
       if (LOGGER.isDebugEnabled()) {
         LOGGER.debug("Serialized block: {} to {} bytes", block, sizeInBytes);
       }
@@ -157,6 +234,11 @@ public class GrpcSendingMailbox implements SendingMailbox {
    */
   protected int processAndSend(DataBlock dataBlock)
       throws IOException {
+    return processAndSend(dataBlock, false);
+  }
+
+  protected int processAndSend(DataBlock dataBlock, boolean bypassReady)
+      throws IOException {
     List<ByteString> byteStrings = toByteStrings(dataBlock, 
_maxByteStringSize);
     int sizeInBytes = 0;
     for (ByteString byteString : byteStrings) {
@@ -166,11 +248,29 @@ public class GrpcSendingMailbox implements SendingMailbox 
{
     while (byteStringIt.hasNext()) {
       ByteString byteString = byteStringIt.next();
       boolean waitForMore = byteStringIt.hasNext();
-      sendContent(byteString, waitForMore);
+      sendContent(byteString, waitForMore, bypassReady);
     }
     return sizeInBytes;
   }
 
+  /// Cancels this mailbox by pushing an error EOS to the receiver and closing 
the gRPC stream.
+  ///
+  /// ## Known limitation: in-band EOS on a stuck receiver
+  ///
+  /// The error EOS is pushed **in-band** on the same gRPC stream as data, via
+  /// [#processAndSend] with `bypassReady=true`. If the receiver's application 
queue
+  /// ([org.apache.pinot.query.mailbox.ReceivingMailbox], default capacity 5) 
is full and its dispatch
+  /// thread is parked in `_notFull.await`, the EOS sits behind every other 
inbound message that already
+  /// made it past gRPC's inbound flow-control window. Worst-case 
cancel-propagation latency is bounded by
+  /// the receiver's inbound credit
+  /// 
([org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT])
+  /// and 
[org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES].
+  ///
+  /// This is **pre-existing** behaviour — the hang surface existed even with 
gRPC's auto-inbound default of
+  /// 1 in-flight message, and survives the rollback knob
+  /// 
[org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED]
+  /// = `false`. The proper fix is an out-of-band cancel channel; see
+  /// https://github.com/apache/pinot/issues/18541.
   @Override
   public void cancel(Throwable t) {
     if (isTerminated()) {
@@ -178,20 +278,32 @@ public class GrpcSendingMailbox implements SendingMailbox 
{
       return;
     }
     _senderSideClosed = true;
+    // Wake any sender thread blocked in awaitReady() so it observes the 
termination and exits without racing this
+    // cancel path on the same observer.
+    wakeWaiters();
     LOGGER.debug("Cancelling mailbox: {}", _id);
-    if (_contentObserver == null) {
-      _contentObserver = getContentObserver();
-    }
+    // Sender thread may never have created the stream (e.g. cancel arrived 
before the first send). Open one now so
+    // the receiver gets an explicit cancel-error EOS instead of waiting for 
its own deadline — the receiving
+    // mailbox is registered per the dispatch plan and is blocked on this 
stream. Lazy init is load-bearing: an eager
+    // init in the constructor would change cancel-before-first-send semantics 
(we would always open a stream even
+    // when send() is never called). Double-checked-lock through _readyLock so 
a concurrent first-send + cancel from
+    // two threads do not both call getContentObserver() and leak an orphan 
gRPC stream.
+    ensureContentObserverInitialized();
+    // Acquire _readyLock so the error EOS + onCompleted() is one atomic 
outbound. ReentrantLock: the inner
+    // sendContent()'s acquisition nests safely.
+    _readyLock.lock();
     try {
       String msg = t != null ? t.getMessage() : "Unknown";
       // NOTE: DO NOT use onError() because it will terminate the stream, and 
receiver might not get the callback
       MseBlock errorBlock = ErrorMseBlock.fromError(
           QueryErrorCode.QUERY_CANCELLATION, "Cancelled by sender with 
exception: " + msg);
-      processAndSend(errorBlock, List.of());
+      processAndSend(errorBlock, List.of(), /* bypassReady */ true);
       _contentObserver.onCompleted();
     } catch (Exception e) {
       // Exception can be thrown if the stream is already closed, so we simply 
ignore it
       LOGGER.debug("Caught exception cancelling mailbox: {}", _id, e);
+    } finally {
+      _readyLock.unlock();
     }
   }
 
@@ -208,23 +320,204 @@ public class GrpcSendingMailbox implements 
SendingMailbox {
     return _senderSideClosed || _statusObserver.isFinished();
   }
 
-  private StreamObserver<MailboxContent> getContentObserver() {
+  /// Lazily initializes [#_contentObserver] under [#_readyLock] using the 
standard double-checked-locking idiom.
+  ///
+  /// Lazy init is load-bearing for cancel-before-first-send semantics: we do 
not want to open a gRPC stream in the
+  /// constructor because a mailbox that is cancelled before any [#send] would 
then leak an unused stream. Eager init
+  /// is therefore not an option.
+  ///
+  /// The unsynchronized `if (_contentObserver == null) { _contentObserver = 
getContentObserver(); }` pattern this
+  /// replaces was unsafe: two threads (e.g. the sender thread doing its first 
[#sendInternal] and an external
+  /// canceller running [#cancel]) could both observe `null`, both call 
[#getContentObserver], and each open a
+  /// separate gRPC stream for the same mailbox id. The loser would never see 
`onCompleted` and the orphan stream
+  /// would only be reclaimed by the gRPC deadline — and the cancel EOS could 
land on a different stream than the
+  /// data, silently losing the cancellation signal.
+  ///
+  /// `_contentObserver` is `volatile` so the fast-path check provides the 
happens-before edge for the eventual
+  /// observer reads in [#sendContent] / [#cancel].
+  private void ensureContentObserverInitialized() {
+    if (_contentObserver != null) {
+      return;
+    }
+    _readyLock.lock();
+    try {
+      if (_contentObserver == null) {
+        _contentObserver = getContentObserver();
+      }
+    } finally {
+      _readyLock.unlock();
+    }
+  }
+
+  // Package-private to allow regression tests for the lazy-init race to count 
how many times a content observer is
+  // opened for a single mailbox.
+  ClientCallStreamObserver<MailboxContent> getContentObserver() {
     Metadata metadata = new Metadata();
     metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
 
-    return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname, 
_port))
+    // We wrap `_statusObserver` in a ClientResponseObserver so we can 
register the on-ready handler through
+    // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after 
open() returns. Wrapping (rather than
+    // making MailboxStatusObserver itself a ClientResponseObserver) keeps the 
back-pressure plumbing local to this
+    // class. The wrapper delegates the data callbacks unchanged, and signals 
our `_readyCond` on stream close so a
+    // blocked sender wakes up to observe `_statusObserver.isFinished()` 
becoming true.
+    ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+        new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+          @Override
+          public void beforeStart(ClientCallStreamObserver<MailboxContent> 
requestStream) {
+            // Fires on a gRPC channel/Netty thread whenever isReady() 
transitions false -> true. Just signal; the
+            // sender re-checks the predicate after waking.
+            
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+          }
+
+          @Override
+          public void onNext(MailboxStatus value) {
+            _statusObserver.onNext(value);
+            // Only wake on receiver early-terminate. Transport-level 
isReady() transitions reach a parked
+            // sender through setOnReadyHandler (registered in beforeStart 
above); normal buffer-size ACKs
+            // do not change any predicate awaitReady() actually waits on, so 
signalling them would force a
+            // spurious park/unpark cycle on every receiver ACK. 
Early-terminate is the one status-only
+            // change (the stream stays open) that awaitReady() must observe 
promptly, so we still signal
+            // here when its metadata is set.
+            if (Boolean.parseBoolean(
+                
value.getMetadataMap().get(ChannelUtils.MAILBOX_METADATA_REQUEST_EARLY_TERMINATE)))
 {
+              wakeWaiters();
+            }
+          }
+
+          @Override
+          public void onError(Throwable t) {
+            try {
+              _statusObserver.onError(t);
+            } finally {
+              wakeWaiters();
+            }
+          }
+
+          @Override
+          public void onCompleted() {
+            try {
+              _statusObserver.onCompleted();
+            } finally {
+              wakeWaiters();
+            }
+          }
+        };
+
+    return (ClientCallStreamObserver<MailboxContent>) PinotMailboxGrpc.newStub(
+            _channelManager.getChannel(_hostname, _port))
         .withInterceptors(MetadataUtils.newAttachHeadersInterceptor(metadata))
         .withDeadlineAfter(_deadlineMs - System.currentTimeMillis(), 
TimeUnit.MILLISECONDS)
-        .open(_statusObserver);
+        .open(responseObserver);
   }
 
   protected void sendContent(ByteString byteString, boolean waitForMore) {
-    MailboxContent content = MailboxContent.newBuilder()
-        .setMailboxId(_id)
-        .setPayload(byteString)
-        .setWaitForMore(waitForMore)
-        .build();
-    _contentObserver.onNext(content);
+    sendContent(byteString, waitForMore, false);
+  }
+
+  protected void sendContent(ByteString byteString, boolean waitForMore, 
boolean bypassReady) {
+    if (!awaitReady(bypassReady)) {
+      // Either the mailbox was cancelled while we were waiting (normal path) 
or the gRPC stream is already dead
+      // (bypass path). Either way, skip the send.
+      return;
+    }
+    // _readyLock is the serialization point for outbound observer calls. Hold 
it across the isTerminated() re-check
+    // and the onNext() so the data path cannot race with cancel() / send(Eos) 
onto the same non-thread-safe
+    // ClientCallStreamObserver. awaitReady() is intentionally outside the 
lock: its slow path already acquires
+    // _readyLock to wait on _readyCond, and acquiring before calling it would 
force the fast `isReady() == true`
+    // path to take and release the lock for no benefit. By the time 
awaitReady() returns true, the slow-path
+    // lock release happens-before this acquisition, so the visibility we need 
is in place.
+    _readyLock.lock();
+    try {
+      if (!bypassReady && isTerminated()) {
+        return;
+      }
+      MailboxContent content = MailboxContent.newBuilder()
+          .setMailboxId(_id)
+          .setPayload(byteString)
+          .setWaitForMore(waitForMore)
+          .build();
+      _contentObserver.onNext(content);
+    } finally {
+      _readyLock.unlock();
+    }
+  }
+
+  /// Blocks the calling (query-runner) thread until the gRPC client outbound 
is ready to accept another chunk, the
+  /// mailbox terminates, or the query deadline is exceeded. Returns `true` if 
the caller should proceed with the
+  /// `onNext` call, `false` if the send should be skipped.
+  ///
+  /// Two modes:
+  ///  * `bypassReady = false` (normal user sends): waits for 
[#_contentObserver]`.isReady()` to flip, but exits
+  ///    early if the mailbox has been [#isTerminated terminated] in the 
meantime. This makes [#cancel] able to
+  ///    unblock a blocked sender promptly.
+  ///  * `bypassReady = true` (the cancel / close paths): never waits, never 
short-circuits on `isTerminated()`.
+  ///    The only check is whether the underlying gRPC stream is already dead 
([MailboxStatusObserver#isFinished]),
+  ///    in which case there is nothing to send. This is what lets a cancel 
issued while the receiver is congested
+  ///    push its error EOS through without blocking behind back-pressure of 
its own.
+  ///
+  /// Spool note: when a stage spools to multiple destination mailboxes via 
[BlockExchange.BlockExchangeSendingMailbox],
+  /// one slow downstream worker will throttle the whole spool — every wrapped 
mailbox is awaited in turn from the same
+  /// OpChain thread. This is intentional: the alternative (forking each 
destination onto its own thread) would
+  /// re-introduce the unbounded outbound queue we are fixing here. Spool 
throughput is gated by the slowest consumer.
+  private boolean awaitReady(boolean bypassReady) {
+    if (bypassReady || !_backpressureEnabled) {
+      return !_statusObserver.isFinished();
+    }
+    // Fast path: don't take the lock if the observer is already ready. This 
is the common case in the steady state.
+    if (_contentObserver.isReady()) {
+      return true;
+    }
+    if (isTerminated() || isEarlyTerminated()) {
+      return false;
+    }
+    _readyLock.lock();
+    try {
+      while (!_contentObserver.isReady()) {
+        if (isTerminated() || isEarlyTerminated()) {
+          return false;
+        }
+        // Cooperative termination poll so query cancellation can unblock the 
wait through the same mechanism we use
+        // elsewhere in the MSE.
+        QueryThreadContext.checkTerminationAndSampleUsage(SEND_SCOPE);
+        long remainingMs = _deadlineMs - System.currentTimeMillis();
+        if (remainingMs <= 0) {
+          throw new QueryException(QueryErrorCode.EXECUTION_TIMEOUT,
+              "Deadline exceeded while waiting for gRPC outbound to become 
ready on mailbox: " + _id);
+        }
+        try {
+          _readyCond.await(remainingMs, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new QueryException(QueryErrorCode.INTERNAL,
+              "Interrupted while waiting for gRPC outbound to become ready on 
mailbox: " + _id, e);
+        }
+      }
+      return true;
+    } finally {
+      _readyLock.unlock();
+    }
+  }
+
+  /// Wakes every waiter in [#awaitReady]. Called from the gRPC ready handler, 
from [MailboxStatusObserver] events,
+  /// and from [#cancel] / [#close]. Must be cheap because it can fire from 
Netty event-loop threads.
+  private void wakeWaiters() {
+    // Lock-from-event-loop pattern, intentional and load-bearing. This method 
can fire from a Netty channel /
+    // gRPC event-loop thread via the `setOnReadyHandler` registered in 
`beforeStart`, as well as from the
+    // `MailboxStatusObserver` callbacks (`onNext`, `onError`, `onCompleted`) 
— all of which run on gRPC
+    // executor threads. Acquiring `_readyLock` here therefore briefly blocks 
the event-loop thread if a sender is
+    // mid-`onNext` / `onCompleted` under the lock. That is accepted because:
+    //   (a) lock-held outbound critical sections are bounded by a single 
`_contentObserver.onNext` or
+    //       `onCompleted` call — the lock is never held across an 
`awaitReady()` wait, an I/O wait, or any other
+    //       blocking operation, so the event-loop stall is at most one 
observer call;
+    //   (b) the obvious alternative — defer the signal to a separate executor 
— adds latency to every wakeup
+    //       and an extra thread hop on the hot back-pressure unblock path, 
which matters in practice more than
+    //       the rare event-loop stall this design accepts.
+    _readyLock.lock();
+    try {
+      _readyCond.signalAll();
+    } finally {
+      _readyLock.unlock();
+    }
   }
 
   @Override
@@ -334,15 +627,42 @@ public class GrpcSendingMailbox implements SendingMailbox 
{
   public void close()
       throws Exception {
     if (!isTerminated()) {
-      String errorMsg = "Closing gPRC mailbox without proper EOS message";
+      String errorMsg = "Closing gRPC mailbox without proper EOS message";
       RuntimeException ex = new RuntimeException(errorMsg);
       ex.fillInStackTrace();
       LOGGER.error(errorMsg, ex);
       _senderSideClosed = true;
+      wakeWaiters();
+
+      // Short-circuit when the sender never opened a stream. close() is the 
"should not happen" cleanup
+      // fallback — for query stages pruned before any data flows the mailbox 
is constructed, never sent on,
+      // and then closed. Pre-PR this was a silent no-op on the wire; commit 
aeaacc893d regressed that by
+      // routing through ensureContentObserverInitialized() to push an error 
EOS, which opened a fresh gRPC
+      // stream and half-closed it (three wasted round-trips, plus a new 
exception surface if channel open
+      // throws during shutdown). cancel() does NOT short-circuit here because 
it has a load-bearing reason
+      // to open the stream: there may be a receiver-side reader blocked 
waiting for the cancel signal. close()
+      // has no such urgency. The volatile read of _contentObserver here 
happens-before the lock acquire below,
+      // so no additional fence is needed.
+      ClientCallStreamObserver<MailboxContent> observer = _contentObserver;
+      if (observer == null) {
+        return;
+      }
 
       MseBlock errorBlock = ErrorMseBlock.fromError(QueryErrorCode.INTERNAL, 
errorMsg);
-      if (_contentObserver != null) {
-        processAndSend(errorBlock, List.of());
+      // _readyLock serialises outbound observer calls so the error EOS + 
onCompleted() is one atomic outbound,
+      // same shape as cancel(). Without the onCompleted() the gRPC client 
stream stayed half-open from the sender
+      // side until the per-call deadline fired — for long-running MSE queries 
that's a multi-minute allocator pin
+      // on the channel, which is exactly the leak the back-pressure work is 
trying to fix.
+      _readyLock.lock();
+      try {
+        processAndSend(errorBlock, List.of(), /* bypassReady */ true);
+        observer.onCompleted();
+      } catch (Exception e) {
+        // Stream may already be half-closed from a racing send(Eos) / cancel 
that we lost to under the lock.
+        // Symmetric to the defensive catch in cancel() — duplicate half-close 
is benign here.
+        LOGGER.debug("Caught exception closing mailbox: {}", _id, e);
+      } finally {
+        _readyLock.unlock();
       }
     }
   }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
index 73fd1379ac4..3aa8924009e 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.query.mailbox;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalListener;
@@ -30,6 +31,10 @@ import java.util.function.Function;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.utils.grpc.ServerGrpcQueryClient;
 import org.apache.pinot.core.instance.context.BrokerContext;
 import org.apache.pinot.core.instance.context.ControllerContext;
@@ -96,6 +101,7 @@ public class MailboxService {
    * bloating is added by gRPC and protobuf.
    */
   private final int _maxInboundMessageSize;
+  private final boolean _grpcSenderBackpressureEnabled;
 
   private GrpcMailboxServer _grpcMailboxServer;
 
@@ -121,11 +127,64 @@ public class MailboxService {
         
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
         
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES
     );
-    _channelManager = new ChannelManager(_clientSslContext, 
_maxInboundMessageSize, getIdleTimeout(config));
+    _grpcSenderBackpressureEnabled = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED
+    );
+    int writeBufferHighWaterMarkBytes = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES);
+    int writeBufferLowWaterMarkBytes = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
+    _channelManager = new ChannelManager(_clientSslContext, 
_maxInboundMessageSize, getIdleTimeout(config),
+        writeBufferHighWaterMarkBytes, writeBufferLowWaterMarkBytes);
     _accessControlFactory = accessControlFactory;
+    registerMailboxClientGauges();
     LOGGER.info("Initialized MailboxService with hostname: {}, port: {}", 
hostname, port);
   }
 
+  /// Registers gauges exposing the memory used by the gRPC client allocator
+  /// shared by every [GrpcSendingMailbox] this service creates. The companion
+  /// gauges for the server allocator are registered in [GrpcMailboxServer].
+  ///
+  /// Notice we are wiring the shaded gRPC Netty allocator
+  /// ([io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator]) rather 
than
+  /// the non-shaded one.
+  ///
+  /// Only the two `MAILBOX_CLIENT_USED_*` gauges are mirrored on the client 
side;
+  /// the six debug-only counterparts exposed by the server 
(`MAILBOX_SERVER_ARENAS_*`,
+  /// `MAILBOX_SERVER_CACHE_SIZE_*`, `MAILBOX_SERVER_THREADLOCALCACHE`,
+  /// `MAILBOX_SERVER_CHUNK_SIZE`) are intentionally not mirrored here. The 
operational
+  /// signal that matters on the client is direct-memory pool exhaustion (the 
OOM that
+  /// motivated wiring these gauges in the first place); the rest are 
server-side
+  /// allocator-internals useful only for debugging. If you ever need to 
mirror the
+  /// remaining six, copy the registration block from [GrpcMailboxServer]'s 
constructor.
+  private void registerMailboxClientGauges() {
+    switch (_instanceType) {
+      case BROKER: {
+        BrokerMetrics brokerMetrics = BrokerMetrics.get();
+        
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_CLIENT_USED_DIRECT_MEMORY,
+            _channelManager::usedDirectMemoryBytes);
+        
brokerMetrics.setOrUpdateGlobalGauge(BrokerGauge.MAILBOX_CLIENT_USED_HEAP_MEMORY,
+            _channelManager::usedHeapMemoryBytes);
+        break;
+      }
+      case SERVER: {
+        ServerMetrics serverMetrics = ServerMetrics.get();
+        
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_CLIENT_USED_DIRECT_MEMORY,
+            _channelManager::usedDirectMemoryBytes);
+        
serverMetrics.setOrUpdateGlobalGauge(ServerGauge.MAILBOX_CLIENT_USED_HEAP_MEMORY,
+            _channelManager::usedHeapMemoryBytes);
+        break;
+      }
+      default:
+        // Controller does not run a MailboxService in production today, but 
if one is ever
+        // started for tests we silently skip metric registration rather than 
failing.
+        break;
+    }
+  }
+
   /**
    * Starts the mailbox service.
    */
@@ -160,13 +219,16 @@ public class MailboxService {
    * not open the underlying channel or acquire any additional resources. 
Instead, it will initialize lazily when the
    * data is sent for the first time.
    */
+  // TODO(https://github.com/apache/pinot/issues/18539): add a global byte 
budget across peers; the
+  //  per-channel WriteBufferWaterMark bound is watermark.high × #peers × 
#concurrent_queries, which can
+  //  reach multiple GiB and re-trigger OutOfDirectMemoryError at large 
fan-outs.
   public SendingMailbox getSendingMailbox(String hostname, int port, String 
mailboxId, long deadlineMs,
       StatMap<MailboxSendOperator.StatKey> statMap) {
     if (_hostname.equals(hostname) && _port == port) {
       return new InMemorySendingMailbox(mailboxId, this, deadlineMs, statMap);
     } else {
       return new GrpcSendingMailbox(mailboxId, _channelManager, hostname, 
port, deadlineMs, statMap,
-          _maxInboundMessageSize);
+          _maxInboundMessageSize, _grpcSenderBackpressureEnabled);
     }
   }
 
@@ -190,6 +252,41 @@ public class MailboxService {
     return _channelManager.resetConnectBackoff(hostname, port);
   }
 
+  /// Current value of [BrokerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] /
+  /// [ServerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] — bytes pinned by the
+  /// shared gRPC client allocator backing every [GrpcSendingMailbox] created
+  /// from this service.
+  @VisibleForTesting
+  public long getMailboxClientUsedDirectMemoryBytes() {
+    return _channelManager.usedDirectMemoryBytes();
+  }
+
+  /// Current value of [BrokerGauge#MAILBOX_CLIENT_USED_HEAP_MEMORY] /
+  /// [ServerGauge#MAILBOX_CLIENT_USED_HEAP_MEMORY].
+  @VisibleForTesting
+  public long getMailboxClientUsedHeapMemoryBytes() {
+    return _channelManager.usedHeapMemoryBytes();
+  }
+
+  /// Current value of [BrokerGauge#MAILBOX_SERVER_USED_DIRECT_MEMORY] /
+  /// [ServerGauge#MAILBOX_SERVER_USED_DIRECT_MEMORY] — bytes pinned by the
+  /// gRPC server allocator handling inbound mailbox traffic.
+  ///
+  /// Returns 0 before [#start] is called (the gRPC server is built lazily 
there).
+  @VisibleForTesting
+  public long getMailboxServerUsedDirectMemoryBytes() {
+    return _grpcMailboxServer != null ? 
_grpcMailboxServer.usedDirectMemoryBytes() : 0L;
+  }
+
+  /// Current value of [BrokerGauge#MAILBOX_SERVER_USED_HEAP_MEMORY] /
+  /// [ServerGauge#MAILBOX_SERVER_USED_HEAP_MEMORY].
+  ///
+  /// Returns 0 before [#start] is called.
+  @VisibleForTesting
+  public long getMailboxServerUsedHeapMemoryBytes() {
+    return _grpcMailboxServer != null ? 
_grpcMailboxServer.usedHeapMemoryBytes() : 0L;
+  }
+
   /**
    * Releases the receiving mailbox from the cache.
    *
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
index bdccf59a6bf..f7567fe1951 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java
@@ -18,11 +18,13 @@
  */
 package org.apache.pinot.query.mailbox.channel;
 
+import com.google.common.base.Preconditions;
 import io.grpc.ConnectivityState;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.shaded.io.grpc.netty.NettyChannelBuilder;
 import io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator;
 import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
+import io.grpc.netty.shaded.io.netty.channel.WriteBufferWaterMark;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import java.time.Duration;
 import java.util.concurrent.ConcurrentHashMap;
@@ -62,6 +64,7 @@ public class ChannelManager {
   private final PooledByteBufAllocator _bufAllocator;
   @Nullable
   private final SslContext _clientSslContext;
+  private final WriteBufferWaterMark _writeBufferWaterMark;
 
   /**
    * Constructs a {@code ChannelManager}.
@@ -69,11 +72,26 @@ public class ChannelManager {
    * @param clientSslContext optional cached client {@link SslContext} to 
reuse across channels
    * @param maxInboundMessageSize maximum inbound message size for gRPC 
channels
    * @param idleTimeout idle timeout for gRPC channels; channels close after 
this period of inactivity
+   * @param writeBufferHighWaterMarkBytes Netty per-channel {@link 
WriteBufferWaterMark} high watermark. This limit is
+   *                                     per {@code (host, port)} peer and is 
shared across all streams multiplexed on
+   *                                     that channel.
+   * @param writeBufferLowWaterMarkBytes Netty per-channel {@link 
WriteBufferWaterMark} low mark. Once the channel's
+   *                                     pending write queue grows above the 
high watermark, the channel is marked
+   *                                     unwritable; it becomes writable again 
only when the queue drains below this
+   *                                     low watermark. Must satisfy {@code 0 
< low ≤ high}; validated eagerly here so
+   *                                     misconfiguration surfaces at startup 
rather than on the first query.
    */
-  public ChannelManager(@Nullable SslContext clientSslContext, int 
maxInboundMessageSize, Duration idleTimeout) {
+  public ChannelManager(@Nullable SslContext clientSslContext, int 
maxInboundMessageSize, Duration idleTimeout,
+      int writeBufferHighWaterMarkBytes, int writeBufferLowWaterMarkBytes) {
     _clientSslContext = clientSslContext;
     _maxInboundMessageSize = maxInboundMessageSize;
     _idleTimeout = idleTimeout;
+    Preconditions.checkArgument(writeBufferLowWaterMarkBytes > 0,
+        "writeBufferLowWaterMarkBytes must be positive, got: %s", 
writeBufferLowWaterMarkBytes);
+    // The `low <= high` (and `low >= 0`) invariant is also checked by Netty's 
WriteBufferWaterMark constructor; by
+    // constructing the watermark eagerly here we surface any violation at 
startup instead of on the first send to
+    // a previously-unseen peer.
+    _writeBufferWaterMark = new 
WriteBufferWaterMark(writeBufferLowWaterMarkBytes, 
writeBufferHighWaterMarkBytes);
     // Use direct buffers (off-heap) for better performance - matches 
server-side configuration
     _bufAllocator = new PooledByteBufAllocator(true);
   }
@@ -86,6 +104,7 @@ public class ChannelManager {
                 .forAddress(k.getLeft(), k.getRight())
                 .maxInboundMessageSize(_maxInboundMessageSize)
                 .withOption(ChannelOption.ALLOCATOR, _bufAllocator)
+                .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
_writeBufferWaterMark)
                 .sslContext(_clientSslContext);
             return decorate(channelBuilder).build();
           }
@@ -97,6 +116,7 @@ public class ChannelManager {
                 .forAddress(k.getLeft(), k.getRight())
                 .maxInboundMessageSize(_maxInboundMessageSize)
                 .withOption(ChannelOption.ALLOCATOR, _bufAllocator)
+                .withOption(ChannelOption.WRITE_BUFFER_WATER_MARK, 
_writeBufferWaterMark)
                 .usePlaintext();
             return decorate(channelBuilder).build();
           });
@@ -122,4 +142,20 @@ public class ChannelManager {
   private NettyChannelBuilder decorate(NettyChannelBuilder builder) {
     return builder.idleTimeout(_idleTimeout.getSeconds(), TimeUnit.SECONDS);
   }
+
+  /// Bytes of direct (off-heap) memory currently pinned by the shared gRPC
+  /// client allocator. Covers every channel managed by this instance and 
remains
+  /// meaningful regardless of whether Netty is configured to prefer direct or
+  /// heap buffers (e.g. `-Dio.netty.noPreferDirect=true`). Consumed by
+  /// [MailboxService] to register the `MAILBOX_CLIENT_USED_DIRECT_MEMORY` 
gauge.
+  public long usedDirectMemoryBytes() {
+    return _bufAllocator.metric().usedDirectMemory();
+  }
+
+  /// Bytes of heap memory currently pinned by the shared gRPC client 
allocator.
+  /// Consumed by [MailboxService] to register the
+  /// `MAILBOX_CLIENT_USED_HEAP_MEMORY` gauge.
+  public long usedHeapMemoryBytes() {
+    return _bufAllocator.metric().usedHeapMemory();
+  }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
index 2a9264ceada..893c4b8d212 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServer.java
@@ -26,6 +26,7 @@ import 
io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric;
 import io.grpc.netty.shaded.io.netty.channel.ChannelOption;
 import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
 import io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent;
+import io.grpc.stub.ServerCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import java.io.IOException;
 import java.util.concurrent.TimeUnit;
@@ -44,6 +45,8 @@ import org.apache.pinot.query.mailbox.MailboxService;
 import org.apache.pinot.spi.config.instance.InstanceType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.utils.CommonConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -53,10 +56,15 @@ import org.apache.pinot.spi.utils.CommonConstants;
  * send by the sender of the sender/receiver pair.
  */
 public class GrpcMailboxServer extends PinotMailboxGrpc.PinotMailboxImplBase {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(GrpcMailboxServer.class);
   private static final long DEFAULT_SHUTDOWN_TIMEOUT_MS = 10_000L;
 
   private final MailboxService _mailboxService;
   private final Server _server;
+  private final PooledByteBufAllocatorMetric _bufAllocatorMetric;
+  private final int _flowControlWindowBytes;
+  private final int _inboundMessageCredit;
+  private final boolean _manualInboundFlowControlEnabled;
 
   /**
    * Constructs a gRPC-based mailbox server.
@@ -79,6 +87,7 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
 
     PooledByteBufAllocator bufAllocator = new PooledByteBufAllocator(true);
     PooledByteBufAllocatorMetric metric = bufAllocator.metric();
+    _bufAllocatorMetric = metric;
 
     // Register memory metrics based on instance type
     InstanceType instanceType = mailboxService.getInstanceType();
@@ -132,13 +141,35 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
     if (accessControlFactory != null) {
       builder.intercept(new AuthorizationInterceptor(accessControlFactory));
     }
+    _flowControlWindowBytes = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES);
+    _inboundMessageCredit = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT);
+    _manualInboundFlowControlEnabled = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED);
+    int maxInboundMessageSize = config.getProperty(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES);
+    Preconditions.checkArgument(_inboundMessageCredit > 0,
+        "%s must be positive, got: %s",
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT,
+        _inboundMessageCredit);
+    // A window smaller than the largest possible single message makes the 
stream pathological: the sender's
+    // isReady() flaps on every message, since no single message can ever fit 
in the available credit. Fail fast
+    // at startup rather than letting it manifest as flapping back-pressure 
mid-query.
+    Preconditions.checkArgument(_flowControlWindowBytes >= 
maxInboundMessageSize,
+        "%s (%s) must be >= %s (%s)",
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 
_flowControlWindowBytes,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
 maxInboundMessageSize);
     builder
         .addService(this)
         .withOption(ChannelOption.ALLOCATOR, bufAllocator)
         .withChildOption(ChannelOption.ALLOCATOR, bufAllocator)
-        .maxInboundMessageSize(config.getProperty(
-            
CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
-            
CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES));
+        .maxInboundMessageSize(maxInboundMessageSize)
+        .flowControlWindow(_flowControlWindowBytes);
 
     // Add SSL context only if TLS is configured
     if (tlsConfig != null) {
@@ -151,6 +182,9 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
   }
 
   public void start() {
+    LOGGER.info("Starting GrpcMailboxServer with flowControlWindow={} bytes, 
inboundMessageCredit={}, "
+            + "manualInboundFlowControlEnabled={}",
+        _flowControlWindowBytes, _inboundMessageCredit, 
_manualInboundFlowControlEnabled);
     try {
       _server.start();
     } catch (IOException e) {
@@ -166,9 +200,35 @@ public class GrpcMailboxServer extends 
PinotMailboxGrpc.PinotMailboxImplBase {
     }
   }
 
+  /// Bytes of direct (off-heap) memory currently pinned by the gRPC server
+  /// allocator backing this mailbox server. This is the same allocator whose
+  /// values are exported as `MAILBOX_SERVER_USED_DIRECT_MEMORY` gauges.
+  public long usedDirectMemoryBytes() {
+    return _bufAllocatorMetric.usedDirectMemory();
+  }
+
+  /// Bytes of heap memory currently pinned by the gRPC server allocator 
backing
+  /// this mailbox server. Exported as `MAILBOX_SERVER_USED_HEAP_MEMORY` 
gauges.
+  public long usedHeapMemoryBytes() {
+    return _bufAllocatorMetric.usedHeapMemory();
+  }
+
   @Override
   public StreamObserver<Mailbox.MailboxContent> 
open(StreamObserver<Mailbox.MailboxStatus> responseObserver) {
     String mailboxId = ChannelUtils.MAILBOX_ID_CTX_KEY.get();
-    return new MailboxContentObserver(_mailboxService, mailboxId, 
responseObserver);
+    ServerCallStreamObserver<Mailbox.MailboxStatus> serverCallObserver =
+        (ServerCallStreamObserver<Mailbox.MailboxStatus>) responseObserver;
+    if (_manualInboundFlowControlEnabled) {
+      // Manual inbound flow control: override gRPC's auto-inbound (which 
calls request(1) after each
+      // onNext) and prefetch _inboundMessageCredit messages up-front. 
MailboxContentObserver.onNext will
+      // then replenish one credit at the top of each call so the in-flight 
window stays full while the
+      // application drains. This is the primary throughput knob for 
small/medium MSE blocks.
+      serverCallObserver.disableAutoInboundFlowControl();
+      serverCallObserver.request(_inboundMessageCredit);
+    }
+    // Else: leave gRPC's auto-inbound in place — only 1 message in flight at 
a time. This is the pre-PR
+    // behaviour, retained as a rollback knob via 
KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED.
+    return new MailboxContentObserver(_mailboxService, mailboxId, 
serverCallObserver,
+        _manualInboundFlowControlEnabled);
   }
 }
diff --git 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
index 9bb33e2f918..e5088bc8c47 100644
--- 
a/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
+++ 
b/pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserver.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.mailbox.channel;
 
 import io.grpc.Context;
+import io.grpc.stub.ServerCallStreamObserver;
 import io.grpc.stub.StreamObserver;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -47,21 +48,35 @@ public class MailboxContentObserver implements 
StreamObserver<MailboxContent> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(MailboxContentObserver.class);
 
   private final MailboxService _mailboxService;
-  private final StreamObserver<MailboxStatus> _responseObserver;
+  private final ServerCallStreamObserver<MailboxStatus> _responseObserver;
+  private final boolean _manualInboundFlowControlEnabled;
   private final List<ByteBuffer> _mailboxBuffers = 
Collections.synchronizedList(new ArrayList<>());
   private boolean _closedStream = false;
 
   private volatile ReceivingMailbox _mailbox;
 
   public MailboxContentObserver(MailboxService mailboxService, String 
mailboxId,
-      StreamObserver<MailboxStatus> responseObserver) {
+      ServerCallStreamObserver<MailboxStatus> responseObserver, boolean 
manualInboundFlowControlEnabled) {
     _mailboxService = mailboxService;
     _responseObserver = responseObserver;
+    _manualInboundFlowControlEnabled = manualInboundFlowControlEnabled;
     _mailbox = StringUtils.isNotBlank(mailboxId) ? 
_mailboxService.getReceivingMailbox(mailboxId) : null;
   }
 
   @Override
   public void onNext(MailboxContent mailboxContent) {
+    if (_manualInboundFlowControlEnabled) {
+      // Replenish one inbound-message credit immediately, before any work 
that might block (e.g., the
+      // offerData lock acquisition inside _mailbox.offerRaw). This decouples 
the sender's HTTP/2 window
+      // replenishment from the receiver's per-message processing time — gRPC 
will issue the WINDOW_UPDATE
+      // for this message as soon as this request(1) call sets the credit, not 
waiting for onNext to return.
+      // Do not move this below the blocking _mailbox.offerRaw call — it must 
replenish credit
+      // BEFORE the offer so the receiver doesn't gate the sender on 
per-message application drain time.
+      _responseObserver.request(1);
+    }
+    // Else: gRPC's auto-inbound is in place and will automatically replenish 
1 credit after onNext
+    // returns. Do not call request(1) here in that mode — it would 
double-count the credit and break
+    // the 1-in-flight semantics that the rollback knob restores.
     if (_closedStream) {
       LOGGER.debug("Received a late message once the stream was closed. 
Ignoring it.");
       return;
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureOffPathTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureOffPathTest.java
new file mode 100644
index 00000000000..43ff4bb63b8
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureOffPathTest.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Pins the off-path through `awaitReady`, which is also the production 
default for
+/// `pinot.query.runner.grpc.sender.backpressure.enabled` (see
+/// 
[org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner#DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED]).
+///
+/// ## What this test checks
+///
+/// When the flag is set to `false` (the default), 
[GrpcSendingMailbox#awaitReady] must short-circuit
+/// immediately — i.e. the `!_backpressureEnabled` branch in the guard must 
fire — so the sender pushes
+/// blocks without waiting for the receiver to drain them. Under that 
condition a fast sender can push
+/// orders of magnitude more blocks than a slow receiver polls in the same 
wall-clock window.
+///
+/// ## Regression risk
+///
+/// If someone accidentally removes or inverts the `bypassReady || 
!_backpressureEnabled` short-circuit
+/// in `awaitReady`, the gate would become permanently active and this flag 
would have no effect. The
+/// test would then see `sendCount` track `polledCount` closely (as in the 
*opt-in* companion test),
+/// causing the `sendCount > polledCount * 10` assertion to fail and surfacing 
the regression in CI.
+///
+/// ## Relation to the companion tests
+///
+/// [GrpcSenderBackpressureTest] verifies the opt-in (`enabled = true`) path 
with wide transport
+/// defaults — that the gate keeps the sender in check. 
[GrpcSenderBackpressureTightGateTest] does the
+/// same with narrow transport so the gate is the dominant back-pressure 
mechanism. This test verifies
+/// the off path — that turning the gate off (or leaving it at the production 
default) actually removes
+/// it. Together they pin both sides of the boolean.
+public class GrpcSenderBackpressureOffPathTest {
+  private static final DataSchema SCHEMA = new DataSchema(
+      new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+  // Same small payload as the companion test — enough to have a real 
serialized
+  // body but small enough not to exhaust direct memory within the 3-second 
budget.
+  private static final String PAYLOAD = "x".repeat(128);
+  private static final long SEND_BUDGET_NS = TimeUnit.SECONDS.toNanos(3);
+  private static final long READER_POLL_INTERVAL_MS = 20;
+
+  private MailboxService _senderService;
+  private MailboxService _receiverService;
+
+  @BeforeClass
+  public void setUp() {
+    // Disable the back-pressure gate so awaitReady short-circuits 
unconditionally.
+    PinotConfiguration config = new PinotConfiguration(Map.of(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, 
false));
+    _senderService = new MailboxService("localhost", 
QueryTestUtils.getAvailablePort(),
+        InstanceType.SERVER, config);
+    _senderService.start();
+    _receiverService = new MailboxService("localhost", 
QueryTestUtils.getAvailablePort(),
+        InstanceType.SERVER, config);
+    _receiverService.start();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _senderService.shutdown();
+    _receiverService.shutdown();
+  }
+
+  @Test
+  public void killSwitchDisablesBackpressureGate()
+      throws Exception {
+    String mailboxId = MailboxIdUtils.toMailboxId(1, 1, 0, 0, 0);
+    long deadlineMs = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(5);
+    StatMap<MailboxSendOperator.StatKey> stats =
+        new StatMap<>(MailboxSendOperator.StatKey.class);
+
+    SendingMailbox sender = _senderService.getSendingMailbox(
+        "localhost", _receiverService.getPort(), mailboxId, deadlineMs, stats);
+    ReceivingMailbox receiver = 
_receiverService.getReceivingMailbox(mailboxId);
+    receiver.registeredReader(() -> { });
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+    AtomicLong polled = new AtomicLong();
+    Thread slowReader = new Thread(() -> {
+      while (!stop.get()) {
+        ReceivingMailbox.MseBlockWithStats msg = receiver.poll();
+        if (msg != null && !msg.getBlock().isEos()) {
+          polled.incrementAndGet();
+        }
+        sleepQuiet(READER_POLL_INTERVAL_MS);
+      }
+    }, "slow-reader");
+    slowReader.setDaemon(true);
+    slowReader.start();
+
+    // Same instance, sent over and over.
+    RowHeapDataBlock block = OperatorTestUtil.block(SCHEMA, new 
Object[]{PAYLOAD});
+
+    int sendCount = 0;
+
+    // Watchdog: with the gate off the sender races ahead without blocking; we 
still
+    // need a watchdog to cap wall-clock time and prevent the loop from filling
+    // direct memory before the test completes. The cancel wakes any thread 
that
+    // happens to be inside awaitReady (shouldn't be any with the gate off, but
+    // it also covers the isTerminated() check in the send loop).
+    ScheduledExecutorService watchdog = 
Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "test-budget-watchdog");
+      t.setDaemon(true);
+      return t;
+    });
+    watchdog.schedule(() -> sender.cancel(new RuntimeException("test budget 
elapsed")),
+        SEND_BUDGET_NS, TimeUnit.NANOSECONDS);
+
+    try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+      while (!sender.isTerminated()) {
+        sender.send(block);
+        sendCount++;
+      }
+    } finally {
+      watchdog.shutdownNow();
+    }
+
+    stop.set(true);
+    slowReader.join(TimeUnit.SECONDS.toMillis(10));
+
+    long polledCount = polled.get();
+
+    System.out.printf(Locale.ROOT,
+        "[GrpcSenderBackpressureOffPathTest] sent=%d polled=%d ratio=%.1fx%n",
+        sendCount, polledCount,
+        polledCount == 0 ? Double.POSITIVE_INFINITY : (double) sendCount / 
polledCount);
+
+    // The sender must have pushed far more blocks than the slow receiver could
+    // poll. At 20 ms polling intervals over a 3-second budget the receiver 
sees
+    // roughly 150 polls; with the gate off the sender typically reaches 
hundreds
+    // of thousands of sends. A 10x ratio is an extremely conservative floor —
+    // if the gate were accidentally left active, sendCount would track 
polledCount
+    // closely (ratio ~1–3x) and this assertion would fail.
+    assertTrue(sendCount > polledCount * 10,
+        "Expected the kill-switch to let the sender massively outrpace the 
receiver, but "
+            + "sendCount=" + sendCount + " polledCount=" + polledCount
+            + " ratio=" + (polledCount == 0 ? "∞" : sendCount / polledCount)
+            + ". This may indicate the backpressure gate is still active 
despite the flag being false.");
+  }
+
+  private static void sleepQuiet(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java
new file mode 100644
index 00000000000..e80263b286e
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTest.java
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates that sender-side gRPC back-pressure keeps a fast sender roughly 
in step with a slow receiver.
+///
+/// A "fast sender" pushes the same small data block repeatedly on the test 
thread while a "slow reader"
+/// thread polls the receiving mailbox at roughly 50 blocks per second. With 
back-pressure in place, the
+/// sender thread blocks inside [GrpcSendingMailbox.awaitReady] whenever the 
gRPC outbound queue fills,
+/// so the send rate tracks the polling rate plus a bounded in-flight pipeline.
+///
+/// With the new transport defaults (64 MB HTTP/2 flow-control window and 64 
MB Netty write-buffer
+/// high-water mark), both the gRPC stream window and the Netty WriteQueue can 
buffer far more data
+/// before signalling back-pressure.  As a result, 
[GrpcSendingMailbox.awaitReady] rarely fires the
+/// application-level gate during a 3-second run; the back-pressure that 
*does* apply is transport-level
+/// (the kernel's TCP send buffer and the receiver's gRPC server read loop).  
This means the ratio of
+/// `sendCount` to `polledCount` is much larger than with the old 
narrow-window defaults, and the test
+/// now exercises that transport-level back-pressure rather than the 
application-level gate.
+///
+/// The test asserts two complementary properties:
+///  1. `sendCount` is bounded by a generous multiple of `polledCount` — 
without any back-pressure the
+///     ratio would be orders of magnitude higher as the sender exhausts 
direct memory.  The observed
+///     ratio with the new defaults is around 2200×; the threshold is set to 
~3× that to give headroom
+///     for hardware variation while still catching complete removal of all 
back-pressure.
+///  2. The peak growth of the sender's client allocator stays under a 
generous cap — the wider
+///     in-flight pipeline justified by the larger transport defaults means 
more data can be in-flight
+///     at any moment, so the cap is calibrated to ~3× the observed peak 
growth (~8 MB).
+///
+/// The thresholds are intentionally loose: this is a regression guard against 
all back-pressure being
+/// silently removed, not a precise performance SLA.
+public class GrpcSenderBackpressureTest {
+  private static final DataSchema SCHEMA = new DataSchema(
+      new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+  // Small but not empty — enough that each MailboxContent has a real payload
+  // but small enough that 100k of them comfortably fit in JVM direct memory on
+  // a CI machine.
+  private static final String PAYLOAD = "x".repeat(128);
+  private static final long SEND_BUDGET_NS = TimeUnit.SECONDS.toNanos(3);
+  private static final long READER_POLL_INTERVAL_MS = 20;
+
+  private MailboxService _senderService;
+  private MailboxService _receiverService;
+
+  @BeforeClass
+  public void setUp() {
+    // Both back-pressure features (sender gate + manual receiver-side flow 
control with prefetched
+    // credit) default to off; this test exists to pin the bounded-sender 
behaviour they provide, so we
+    // explicitly turn both on here.
+    PinotConfiguration config = new PinotConfiguration(Map.of(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, 
true,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
 true,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT, 128));
+    _senderService = new MailboxService("localhost", 
QueryTestUtils.getAvailablePort(),
+        InstanceType.SERVER, config);
+    _senderService.start();
+    _receiverService = new MailboxService("localhost", 
QueryTestUtils.getAvailablePort(),
+        InstanceType.SERVER, config);
+    _receiverService.start();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _senderService.shutdown();
+    _receiverService.shutdown();
+  }
+
+  @Test
+  public void senderObservesBackpressureFromSlowReceiver()
+      throws Exception {
+    String mailboxId = MailboxIdUtils.toMailboxId(1, 1, 0, 0, 0);
+    long deadlineMs = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(5);
+    StatMap<MailboxSendOperator.StatKey> stats =
+        new StatMap<>(MailboxSendOperator.StatKey.class);
+
+    SendingMailbox sender = _senderService.getSendingMailbox(
+        "localhost", _receiverService.getPort(), mailboxId, deadlineMs, stats);
+    ReceivingMailbox receiver = 
_receiverService.getReceivingMailbox(mailboxId);
+    receiver.registeredReader(() -> { });
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+    AtomicLong polled = new AtomicLong();
+    Thread slowReader = new Thread(() -> {
+      while (!stop.get()) {
+        ReceivingMailbox.MseBlockWithStats msg = receiver.poll();
+        if (msg != null && !msg.getBlock().isEos()) {
+          polled.incrementAndGet();
+        }
+        sleepQuiet(READER_POLL_INTERVAL_MS);
+      }
+    }, "slow-reader");
+    slowReader.setDaemon(true);
+    slowReader.start();
+
+    // Same instance, sent over and over.
+    RowHeapDataBlock block = OperatorTestUtil.block(SCHEMA, new 
Object[]{PAYLOAD});
+
+    // We read memory through the `MailboxService` gauge accessors instead of
+    // `PlatformDependent.usedDirectMemory()`. The gauges:
+    //  * are scoped per `MailboxService`, so the numbers don't leak in from
+    //    other gRPC traffic in the same JVM;
+    //  * report both direct and heap, so they stay meaningful when Netty is
+    //    forced to heap (e.g. `-Dio.netty.noPreferDirect=true`);
+    //  * are exactly the values exported in production as the
+    //    `MAILBOX_CLIENT_USED_*` and `MAILBOX_SERVER_USED_*` gauges.
+    long baselineClient = senderClientPool(_senderService);
+    long baselineServer = receiverServerPool(_receiverService);
+    long peakClient = baselineClient;
+    long peakServer = baselineServer;
+    int sendCount = 0;
+
+    // Watchdog: with back-pressure in place, `sender.send` blocks once the 
gRPC outbound is full, so a wall-clock
+    // deadline checked between sends is not enough to bound the test runtime. 
We cancel the sender from a separate
+    // thread when the budget elapses, which wakes the blocked `awaitReady()` 
waiter and lets the send loop exit
+    // via the `isTerminated()` check.
+    ScheduledExecutorService watchdog = 
Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "test-budget-watchdog");
+      t.setDaemon(true);
+      return t;
+    });
+    watchdog.schedule(() -> sender.cancel(new RuntimeException("test budget 
elapsed")),
+        SEND_BUDGET_NS, TimeUnit.NANOSECONDS);
+
+    try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+      while (!sender.isTerminated()) {
+        sender.send(block);
+        sendCount++;
+        // Sample pool memory periodically rather than on every send to keep 
the hot loop tight.
+        if ((sendCount & 0xff) == 0) {
+          peakClient = Math.max(peakClient, senderClientPool(_senderService));
+          peakServer = Math.max(peakServer, 
receiverServerPool(_receiverService));
+        }
+      }
+      peakClient = Math.max(peakClient, senderClientPool(_senderService));
+      peakServer = Math.max(peakServer, receiverServerPool(_receiverService));
+    } finally {
+      watchdog.shutdownNow();
+    }
+
+    // RAW_MESSAGES at this point may already include the error EOS the 
watchdog's cancel pushed through, so we use
+    // `<=` rather than `==` in the assertion below.
+    int rawMessages = stats.getInt(MailboxSendOperator.StatKey.RAW_MESSAGES);
+
+    stop.set(true);
+    slowReader.join(TimeUnit.SECONDS.toMillis(10));
+
+    long polledCount = polled.get();
+    long clientGrowth = peakClient - baselineClient;
+    long serverGrowth = peakServer - baselineServer;
+
+    System.out.printf(Locale.ROOT,
+        "[GrpcSenderBackpressureTest] sent=%d polled=%d ratio=%.1fx%n"
+            + "  sender   MAILBOX_CLIENT_USED_*: direct=%dB heap=%dB (peak 
growth=%dB)%n"
+            + "  receiver MAILBOX_SERVER_USED_*: direct=%dB heap=%dB (peak 
growth=%dB)%n",
+        sendCount, polledCount,
+        polledCount == 0 ? Double.POSITIVE_INFINITY : (double) sendCount / 
polledCount,
+        _senderService.getMailboxClientUsedDirectMemoryBytes(),
+        _senderService.getMailboxClientUsedHeapMemoryBytes(),
+        clientGrowth,
+        _receiverService.getMailboxServerUsedDirectMemoryBytes(),
+        _receiverService.getMailboxServerUsedHeapMemoryBytes(),
+        serverGrowth);
+
+    // RAW_MESSAGES counts every block we pushed through processAndSend, 
including the error EOS the watchdog's
+    // cancel may have emitted. So `rawMessages` is `sendCount` or `sendCount 
+ 1`.
+    assertTrue(rawMessages == sendCount || rawMessages == sendCount + 1,
+        "RAW_MESSAGES (" + rawMessages + ") should equal sendCount (" + 
sendCount + ") or sendCount+1");
+
+    // (1) Bounded in-flight pipeline.
+    //
+    // With the new 64 MB HTTP/2 flow-control window and 64 MB Netty 
write-buffer high-water mark,
+    // the transport can buffer substantially more data before stalling the 
sender.  The observed
+    // ratio with these defaults is ~2200×; we allow ~3× that (7000×) as a 
generous regression guard.
+    // If all back-pressure were removed the sender would exhaust direct 
memory within the 3-second
+    // budget at a ratio several orders of magnitude higher — so this 
threshold still catches that.
+    long allowedSendCount = polledCount * 7000;
+    assertTrue(sendCount < allowedSendCount,
+        "Sender outpaced the receiver beyond the in-flight allowance. sent=" + 
sendCount
+            + " polled=" + polledCount + " allowed=" + allowedSendCount);
+
+    // (2) Bounded sender-side direct memory growth.
+    //
+    // With the wider in-flight pipeline the sender-side allocator may grow by 
up to one Netty pool
+    // chunk (~16 MB) during the 3-second run.  Observed peak growth is ~8 MB; 
we cap at 25 MB
+    // (~3×) to accommodate variation while still catching unbounded 
allocation regressions.
+    long clientGrowthCap = 25L * 1024 * 1024;
+    assertTrue(clientGrowth < clientGrowthCap,
+        "Sender client allocator grew beyond expected steady-state. growth=" + 
clientGrowth
+            + " cap=" + clientGrowthCap);
+  }
+
+  private static void sleepQuiet(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+
+  /// Same value the production `MAILBOX_CLIENT_USED_*` gauges report — sum of
+  /// direct + heap so the test stays valid regardless of Netty's buffer mode.
+  private static long senderClientPool(MailboxService service) {
+    return service.getMailboxClientUsedDirectMemoryBytes() + 
service.getMailboxClientUsedHeapMemoryBytes();
+  }
+
+  /// Same value the production `MAILBOX_SERVER_USED_*` gauges report.
+  private static long receiverServerPool(MailboxService service) {
+    return service.getMailboxServerUsedDirectMemoryBytes() + 
service.getMailboxServerUsedHeapMemoryBytes();
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTightGateTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTightGateTest.java
new file mode 100644
index 00000000000..3df7ceddb77
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureTightGateTest.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Pins the application-level back-pressure gate in 
[GrpcSendingMailbox#awaitReady]
+/// under *narrow* transport configs, so that the gate — not the transport 
layer — is
+/// the dominant mechanism keeping the sender in step with the receiver.
+///
+/// ## Why this companion test exists
+///
+/// The original [GrpcSenderBackpressureTest] runs with the production-wide 
defaults
+/// (64 MiB HTTP/2 flow-control window, 64 MiB Netty WriteBufferWaterMark high 
/ 32 MiB
+/// low). With those wide defaults the transport-level back-pressure alone 
bounds the
+/// `sendCount / polledCount` ratio comfortably under any reasonable assertion 
at the
+/// 128-byte payload used by the test. The wide-defaults assertion (`sendCount 
> polledCount * 7000`)
+/// therefore still passes even if a future refactor deletes the entire 
`awaitReady`
+/// machinery — the test no longer guards what it was originally built to 
guard.
+///
+/// A reviewer pointed this out: without a companion that *forces* the 
application gate
+/// to be the dominant signal, the gate could silently regress and CI would 
stay green.
+/// This test closes that gap.
+///
+/// ## What this test pins
+///
+/// The transport pipeline is narrowed to the smallest values gRPC/Netty will 
accept:
+///
+/// * `pinot.query.runner.grpc.flow.control.window.bytes = 65535` — gRPC's 
minimum HTTP/2
+///   stream window.
+/// * `pinot.query.runner.grpc.write.buffer.high.water.mark.bytes = 262144` 
(256 KiB).
+/// * `pinot.query.runner.grpc.write.buffer.low.water.mark.bytes = 131072` 
(128 KiB).
+///
+/// At those sizes the wire layer can absorb only a few hundred 128-byte 
payloads before
+/// signalling back-pressure, so the application-level 
[GrpcSendingMailbox#awaitReady]
+/// gate must engage and park the sender on the 
[java.util.concurrent.locks.Condition]
+/// for the test to satisfy the assertion `sendCount <= polledCount * 50 + 
10_000`.
+///
+/// ## Regression risk this test catches
+///
+/// If someone breaks the `bypassReady || !_backpressureEnabled` short-circuit 
in
+/// `awaitReady`, removes the `Condition` wait, or otherwise deletes the 
application
+/// gate while leaving the transport defaults wide, the wide-defaults 
companion test
+/// will still pass — but this test will fail loudly. The sender will run away 
to many
+/// thousands of times the poll rate, blowing the tight bound asserted here.
+///
+/// ## Test matrix
+///
+/// * [GrpcSenderBackpressureTest] — wide defaults, exercises gate + transport 
in
+///   production-ish conditions. Loose assertion.
+/// * [GrpcSenderBackpressureOffPathTest] — wide defaults with the gate at its 
production
+///   default (off); asserts the pre-fix unbounded sender behaviour is 
preserved.
+/// * This test — narrow transport configs; asserts the application gate alone 
is
+///   enough to keep the sender bounded.
+public class GrpcSenderBackpressureTightGateTest {
+  private static final DataSchema SCHEMA = new DataSchema(
+      new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+  // Same small payload as the companion tests — enough to have a real 
serialized
+  // body but small enough not to exhaust direct memory within the 3-second 
budget.
+  private static final String PAYLOAD = "x".repeat(128);
+  private static final long SEND_BUDGET_NS = TimeUnit.SECONDS.toNanos(3);
+  private static final long READER_POLL_INTERVAL_MS = 20;
+
+  private MailboxService _senderService;
+  private MailboxService _receiverService;
+
+  @BeforeClass
+  public void setUp() {
+    // Narrow the gRPC/Netty transport pipeline to the minimum the stack will 
accept,
+    // so the application-level awaitReady gate becomes the dominant 
back-pressure
+    // mechanism rather than the transport layer.
+    // Window must be >= max-inbound-message-size; shrink both together so the 
application gate (rather than the
+    // window-too-small-for-a-message check in GrpcMailboxServer) is what 
bounds the sender. The two back-pressure
+    // features default to off, so we also have to opt in to both for the gate 
to exist in this test.
+    PinotConfiguration config = new PinotConfiguration(Map.ofEntries(
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED,
 true),
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED,
 true),
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT,
 128),
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES,
 65535),
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
 262144),
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES,
 131072),
+        
Map.entry(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES,
 65535)));
+    _senderService = new MailboxService("localhost", 
QueryTestUtils.getAvailablePort(),
+        InstanceType.SERVER, config);
+    _senderService.start();
+    _receiverService = new MailboxService("localhost", 
QueryTestUtils.getAvailablePort(),
+        InstanceType.SERVER, config);
+    _receiverService.start();
+  }
+
+  @AfterClass
+  public void tearDown() {
+    _senderService.shutdown();
+    _receiverService.shutdown();
+  }
+
+  @Test
+  public void applicationGateBoundsSenderUnderNarrowTransport()
+      throws Exception {
+    String mailboxId = MailboxIdUtils.toMailboxId(1, 1, 0, 0, 0);
+    long deadlineMs = System.currentTimeMillis() + 
TimeUnit.MINUTES.toMillis(5);
+    StatMap<MailboxSendOperator.StatKey> stats =
+        new StatMap<>(MailboxSendOperator.StatKey.class);
+
+    SendingMailbox sender = _senderService.getSendingMailbox(
+        "localhost", _receiverService.getPort(), mailboxId, deadlineMs, stats);
+    ReceivingMailbox receiver = 
_receiverService.getReceivingMailbox(mailboxId);
+    receiver.registeredReader(() -> { });
+
+    AtomicBoolean stop = new AtomicBoolean(false);
+    AtomicLong polled = new AtomicLong();
+    Thread slowReader = new Thread(() -> {
+      while (!stop.get()) {
+        ReceivingMailbox.MseBlockWithStats msg = receiver.poll();
+        if (msg != null && !msg.getBlock().isEos()) {
+          polled.incrementAndGet();
+        }
+        sleepQuiet(READER_POLL_INTERVAL_MS);
+      }
+    }, "slow-reader");
+    slowReader.setDaemon(true);
+    slowReader.start();
+
+    // Same instance, sent over and over.
+    RowHeapDataBlock block = OperatorTestUtil.block(SCHEMA, new 
Object[]{PAYLOAD});
+
+    int sendCount = 0;
+
+    // Watchdog: with the gate active the sender blocks inside awaitReady once 
the gRPC
+    // outbound is full, so a wall-clock deadline checked between sends is not 
enough to
+    // bound the test runtime. We cancel the sender from a separate thread 
when the
+    // budget elapses, which wakes the blocked awaitReady waiter and lets the 
send loop
+    // exit via the isTerminated() check.
+    ScheduledExecutorService watchdog = 
Executors.newSingleThreadScheduledExecutor(r -> {
+      Thread t = new Thread(r, "test-budget-watchdog");
+      t.setDaemon(true);
+      return t;
+    });
+    watchdog.schedule(() -> sender.cancel(new RuntimeException("test budget 
elapsed")),
+        SEND_BUDGET_NS, TimeUnit.NANOSECONDS);
+
+    try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+      while (!sender.isTerminated()) {
+        sender.send(block);
+        sendCount++;
+      }
+    } finally {
+      watchdog.shutdownNow();
+    }
+
+    stop.set(true);
+    slowReader.join(TimeUnit.SECONDS.toMillis(10));
+
+    long polledCount = polled.get();
+
+    System.out.printf(Locale.ROOT,
+        "[GrpcSenderBackpressureTightGateTest] sent=%d polled=%d 
ratio=%.1fx%n",
+        sendCount, polledCount,
+        polledCount == 0 ? Double.POSITIVE_INFINITY : (double) sendCount / 
polledCount);
+
+    // Tight bound: with the application gate engaged and the transport 
narrowed so it
+    // can buffer at most a few hundred small payloads, the sender should 
track the
+    // 50 polls/sec reader plus a bounded in-flight pipeline. We allow 
`polledCount * 50`
+    // for the per-poll headroom and an additive `10_000` constant to absorb 
startup
+    // burst before the first awaitReady park. If the application gate is 
silently
+    // removed, the sender races to hundreds of thousands of sends and this 
fails.
+    long allowedSendCount = polledCount * 50 + 10_000;
+    assertTrue(sendCount <= allowedSendCount,
+        "Sender ran away despite narrow transport, suggesting the application 
gate did not engage. "
+            + "sendCount=" + sendCount + " polledCount=" + polledCount + " 
allowed=" + allowedSendCount);
+  }
+
+  private static void sleepQuiet(long ms) {
+    try {
+      Thread.sleep(ms);
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+    }
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
index f1e6480064e..8c67f4e7e7d 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSendingMailboxTest.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.query.mailbox;
 
 import com.google.protobuf.ByteString;
+import io.grpc.stub.ClientCallStreamObserver;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
@@ -27,15 +28,24 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.datablock.DataBlock;
 import org.apache.pinot.common.datablock.DataBlockEquals;
 import org.apache.pinot.common.datablock.DataBlockUtils;
 import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.common.datablock.DataBlockBuilder;
 import org.apache.pinot.query.mailbox.channel.ChannelManager;
 import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.blocks.SuccessMseBlock;
 import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
 import org.apache.pinot.segment.spi.memory.CompoundDataBuffer;
 import org.apache.pinot.segment.spi.memory.DataBuffer;
@@ -58,7 +68,7 @@ public class GrpcSendingMailboxTest {
   public void sendDataThrowsWhenQueryTerminated() {
     ChannelManager channelManager = Mockito.mock(ChannelManager.class);
     GrpcSendingMailbox mailbox = new GrpcSendingMailbox("test-mailbox", 
channelManager, "localhost", 0, Long.MAX_VALUE,
-        new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024);
+        new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024, 
true);
     RowHeapDataBlock block = new 
RowHeapDataBlock(Collections.singletonList(new Object[]{"val"}),
         new DataSchema(new String[]{"foo"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING}));
 
@@ -71,6 +81,224 @@ public class GrpcSendingMailboxTest {
     }
   }
 
+  /// Regression test for the lazy-initialization data race on 
`_contentObserver`. Before the fix, both `sendInternal`
+  /// and `cancel` had an unsynchronized `if (_contentObserver == null) { 
_contentObserver = getContentObserver(); }`
+  /// pattern. `_contentObserver` is `volatile` so individual reads/writes are 
atomic, but two threads racing through
+  /// the check-then-act could both observe `null` and each call 
`getContentObserver()`, opening two gRPC streams for
+  /// the same mailbox id. The fix funnels both call sites through 
`ensureContentObserverInitialized()`, which uses
+  /// the standard double-checked-lock idiom under `_readyLock`.
+  ///
+  /// The test subclasses [GrpcSendingMailbox] to count `getContentObserver` 
calls and return a no-op observer (so we
+  /// never touch the real gRPC stack). Two threads — sender and canceller — 
are synchronized on a [CyclicBarrier]
+  /// so they enter their respective methods together, then we assert exactly 
one observer was opened. Multiple
+  /// iterations surface the race under timing variance.
+  @Test
+  public void concurrentSendAndCancelInitializeContentObserverExactlyOnce()
+      throws Exception {
+    int iterations = 200;
+    ChannelManager channelManager = Mockito.mock(ChannelManager.class);
+    DataSchema schema =
+        new DataSchema(new String[]{"foo"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING});
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    try {
+      for (int i = 0; i < iterations; i++) {
+        CountingGrpcSendingMailbox mailbox = new 
CountingGrpcSendingMailbox("race-mailbox-" + i, channelManager);
+        RowHeapDataBlock block = new 
RowHeapDataBlock(Collections.singletonList(new Object[]{"val"}), schema);
+
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Future<?> senderFuture = executor.submit(() -> {
+          try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+            barrier.await();
+            mailbox.send(block);
+          } catch (Exception e) {
+            // Sender may legitimately observe isTerminated() = true if cancel 
won the race to flip the flag before
+            // sendInternal's gate check. That is the cooperative path we 
want; swallow so we can still assert on the
+            // counter.
+          }
+          return null;
+        });
+
+        Future<?> cancelFuture = executor.submit(() -> {
+          try {
+            barrier.await();
+            mailbox.cancel(new RuntimeException("race-test"));
+          } catch (Exception e) {
+            // ignore: cancel is defensive and may catch exceptions from a 
closed observer
+          }
+          return null;
+        });
+
+        senderFuture.get(10, TimeUnit.SECONDS);
+        cancelFuture.get(10, TimeUnit.SECONDS);
+
+        assertEquals(mailbox.getContentObserverCalls(), 1,
+            "Iteration " + i + ": expected exactly one content observer to be 
opened, but got "
+                + mailbox.getContentObserverCalls()
+                + ". Two streams indicates a lazy-init race between send() and 
cancel().");
+      }
+    } finally {
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor 
did not terminate in time");
+    }
+  }
+
+  /// Regression test pinning the close()-on-never-sent-mailbox no-op contract.
+  ///
+  /// Before commit aeaacc893d ("Fix close() leaving the gRPC sender stream 
half-open") the close() path had an
+  /// `if (_contentObserver != null)` short-circuit, so a mailbox that was 
constructed, never had `send`/`cancel`
+  /// called on it, and then was closed was a silent no-op on the wire. 
aeaacc893d replaced that short-circuit
+  /// with `ensureContentObserverInitialized()`, which opened a fresh gRPC 
stream just to push an error EOS and
+  /// half-close — three round-trips on a stream that never needed to exist. 
For query stages pruned before any
+  /// data flows that is wasted I/O and a new exception surface (channel open 
can throw at shutdown).
+  ///
+  /// This test reproduces the regression: it constructs a 
[CountingGrpcSendingMailbox], skips `send`/`cancel`,
+  /// calls `close()`, and asserts the content-observer counter stayed at 0. 
On the regressed code the counter
+  /// goes to 1 because `ensureContentObserverInitialized()` opens a stream.
+  ///
+  /// The cancel() path intentionally keeps the eager-open behaviour — there 
may be a receiver-side reader blocked
+  /// on this stream waiting for the cancel signal — so this test is scoped to 
close() only.
+  @Test
+  public void closeOnNeverSentMailboxDoesNotOpenStream()
+      throws Exception {
+    ChannelManager channelManager = Mockito.mock(ChannelManager.class);
+    CountingGrpcSendingMailbox mailbox = new 
CountingGrpcSendingMailbox("close-no-op", channelManager);
+
+    mailbox.close();
+
+    assertEquals(mailbox.getContentObserverCalls(), 0,
+        "close() on a never-sent mailbox must be a silent no-op — opening a 
gRPC stream just to half-close it "
+            + "wastes round-trips and introduces a new exception surface at 
shutdown.");
+    Mockito.verifyNoInteractions(channelManager);
+  }
+
+  /// Test subclass: counts how many times [#getContentObserver] is called and 
returns a Mockito stub that mimics a
+  /// healthy stream (isReady() → true, onNext / onCompleted / cancel are 
no-ops). The real gRPC machinery is never
+  /// touched, so the only thing that controls observer-opening is the 
lazy-init logic inside [GrpcSendingMailbox].
+  private static final class CountingGrpcSendingMailbox extends 
GrpcSendingMailbox {
+    private final AtomicInteger _getContentObserverCalls = new AtomicInteger();
+
+    CountingGrpcSendingMailbox(String id, ChannelManager channelManager) {
+      // backpressureEnabled = false so awaitReady short-circuits without 
touching the observer's isReady() — we want
+      // the test to exercise the lazy-init race specifically, not the 
back-pressure gate.
+      super(id, channelManager, "localhost", 0, Long.MAX_VALUE,
+          new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024, 
false);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    ClientCallStreamObserver<MailboxContent> getContentObserver() {
+      _getContentObserverCalls.incrementAndGet();
+      ClientCallStreamObserver<MailboxContent> observer = 
Mockito.mock(ClientCallStreamObserver.class);
+      Mockito.when(observer.isReady()).thenReturn(true);
+      return observer;
+    }
+
+    int getContentObserverCalls() {
+      return _getContentObserverCalls.get();
+    }
+  }
+
+  /// Regression test for the EOS-vs-cancel race that throws 
`IllegalStateException("call already half-closed")`.
+  ///
+  /// Before the fix, two interleavings could escape [GrpcSendingMailbox]:
+  ///  * cancel acquires `_readyLock` first, pushes its error EOS payload + 
`onCompleted()`, then `send(Eos)` reaches
+  ///    its outer half-close in `send(MseBlock.Eos)` and calls 
`_contentObserver.onCompleted()` on the now
+  ///    half-closed stream — `ClientCallStreamObserver` raises 
`IllegalStateException`.
+  ///  * cancel completes fully (payload + `onCompleted`) before `send(Eos)`'s 
`sendContent` even acquires the lock;
+  ///    `sendContent` skips its in-lock `isTerminated()` re-check because 
`bypassReady=true` and calls
+  ///    `_contentObserver.onNext(content)` on the half-closed stream — same 
`IllegalStateException`.
+  ///
+  /// The test uses a [HalfCloseEnforcingMailbox] whose observer mimics the 
gRPC contract: once `onCompleted()` has
+  /// been observed, subsequent `onNext()` / `onCompleted()` raise 
`IllegalStateException`. Two threads — one calling
+  /// `send(SuccessMseBlock)` and one calling `cancel(...)` — are released 
through a [CyclicBarrier] each iteration.
+  /// Across many iterations, both interleavings show up.
+  @Test
+  public void concurrentSendEosAndCancelDoesNotThrow()
+      throws Exception {
+    int iterations = 200;
+    ChannelManager channelManager = Mockito.mock(ChannelManager.class);
+
+    ExecutorService executor = Executors.newFixedThreadPool(2);
+    try {
+      for (int i = 0; i < iterations; i++) {
+        HalfCloseEnforcingMailbox mailbox = new 
HalfCloseEnforcingMailbox("eos-race-" + i, channelManager);
+        CyclicBarrier barrier = new CyclicBarrier(2);
+
+        Future<Throwable> senderFuture = executor.submit(() -> {
+          Throwable thrown = null;
+          try (QueryThreadContext ctx = QueryThreadContext.openForMseTest()) {
+            barrier.await();
+            mailbox.send(SuccessMseBlock.INSTANCE, List.of());
+          } catch (Throwable t) {
+            thrown = t;
+          }
+          return thrown;
+        });
+
+        Future<Throwable> cancelFuture = executor.submit(() -> {
+          Throwable thrown = null;
+          try {
+            barrier.await();
+            mailbox.cancel(new RuntimeException("eos-race-test"));
+          } catch (Throwable t) {
+            thrown = t;
+          }
+          return thrown;
+        });
+
+        Throwable senderError = senderFuture.get(10, TimeUnit.SECONDS);
+        Throwable cancelError = cancelFuture.get(10, TimeUnit.SECONDS);
+
+        Assert.assertNull(senderError, "Iteration " + i + ": send(Eos) threw "
+            + (senderError != null ? senderError.toString() : "null")
+            + ". The EOS path must swallow IllegalStateException from a racing 
cancel.");
+        Assert.assertNull(cancelError, "Iteration " + i + ": cancel() threw "
+            + (cancelError != null ? cancelError.toString() : "null"));
+        assertTrue(mailbox.isTerminated(),
+            "Iteration " + i + ": mailbox should be terminated after send(Eos) 
+ cancel");
+      }
+    } finally {
+      executor.shutdownNow();
+      assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS), "Executor 
did not terminate in time");
+    }
+  }
+
+  /// Test subclass whose stub observer enforces the real gRPC half-close 
contract: once `onCompleted()` is observed,
+  /// subsequent `onNext()` / `onCompleted()` raise 
`IllegalStateException("call already half-closed")`. That's the
+  /// exact escape path Fix 1 prevents — without it, `send(Eos)` and `cancel` 
racing would propagate this exception
+  /// out of `GrpcSendingMailbox`.
+  private static final class HalfCloseEnforcingMailbox extends 
GrpcSendingMailbox {
+    HalfCloseEnforcingMailbox(String id, ChannelManager channelManager) {
+      // backpressureEnabled = false so awaitReady short-circuits without 
spinning on isReady(); we want to exercise
+      // the half-close ordering, not the back-pressure gate.
+      super(id, channelManager, "localhost", 0, Long.MAX_VALUE,
+          new StatMap<>(MailboxSendOperator.StatKey.class), 4 * 1024 * 1024, 
false);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    ClientCallStreamObserver<MailboxContent> getContentObserver() {
+      ClientCallStreamObserver<MailboxContent> observer = 
Mockito.mock(ClientCallStreamObserver.class);
+      AtomicBoolean halfClosed = new AtomicBoolean();
+      Mockito.when(observer.isReady()).thenReturn(true);
+      Mockito.doAnswer(invocation -> {
+        if (halfClosed.get()) {
+          throw new IllegalStateException("call already half-closed");
+        }
+        return null;
+      }).when(observer).onNext(Mockito.any());
+      Mockito.doAnswer(invocation -> {
+        if (!halfClosed.compareAndSet(false, true)) {
+          throw new IllegalStateException("call already half-closed");
+        }
+        return null;
+      }).when(observer).onCompleted();
+      return observer;
+    }
+  }
+
   @Test(dataProvider = "byteBuffersDataProvider")
   public void testByteBuffersToByteStrings(int[] byteBufferSizes, int 
maxByteStringSize) {
     List<ByteBuffer> input =
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
index 1ba1da8bc8f..11faa327dc1 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/ChannelManagerTest.java
@@ -24,6 +24,7 @@ import java.lang.reflect.Field;
 import java.time.Duration;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
@@ -38,14 +39,18 @@ public class ChannelManagerTest {
 
   @Test
   public void testResetConnectBackoffNoOpForUnknownChannel() {
-    ChannelManager channelManager = new ChannelManager(null, 4_000_000, 
Duration.ofDays(365));
+    ChannelManager channelManager = new ChannelManager(null, 4_000_000, 
Duration.ofDays(365),
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
     // Should return false and not throw when no channel exists for the given 
host/port
     assertFalse(channelManager.resetConnectBackoff("unknown-host", 12345));
   }
 
   @Test
   public void testResetConnectBackoffNoOpWhenNotInTransientFailure() {
-    ChannelManager channelManager = new ChannelManager(null, 4_000_000, 
Duration.ofDays(365));
+    ChannelManager channelManager = new ChannelManager(null, 4_000_000, 
Duration.ofDays(365),
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
     // Create a channel by calling getChannel
     ManagedChannel channel = channelManager.getChannel("localhost", 12345);
     try {
@@ -63,7 +68,9 @@ public class ChannelManagerTest {
   @SuppressWarnings("unchecked")
   public void testResetConnectBackoffResetsWhenInTransientFailure()
       throws Exception {
-    ChannelManager channelManager = new ChannelManager(null, 4_000_000, 
Duration.ofDays(365));
+    ChannelManager channelManager = new ChannelManager(null, 4_000_000, 
Duration.ofDays(365),
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES);
 
     ManagedChannel mockChannel = mock(ManagedChannel.class);
     
when(mockChannel.getState(false)).thenReturn(ConnectivityState.TRANSIENT_FAILURE);
@@ -78,4 +85,26 @@ public class ChannelManagerTest {
     assertTrue(channelManager.resetConnectBackoff("failing-host", 9999));
     verify(mockChannel).resetConnectBackoff();
   }
+
+  /// Pins the fail-fast gate added in commit 1d29438dc0 ("Fail fast on 
invalid gRPC mailbox transport
+  /// configuration"): a non-positive `writeBufferLowWaterMarkBytes` must 
throw at startup rather than
+  /// surfacing later as a Netty `WriteBufferWaterMark` constructor failure on 
the first send.
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = ".*writeBufferLowWaterMarkBytes must 
be positive.*")
+  public void testConstructorRejectsZeroWriteBufferLowWaterMark() {
+    new ChannelManager(null, 4_000_000, Duration.ofDays(365),
+        
CommonConstants.MultiStageQueryRunner.DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES,
+        0);
+  }
+
+  /// Pins the eager `new WriteBufferWaterMark(low, high)` invariant: when 
`low > high`, Netty's own
+  /// constructor throws `IllegalArgumentException`. Constructing the 
watermark eagerly in
+  /// `ChannelManager` (added in 1d29438dc0) is what makes this surface at 
startup instead of on the
+  /// first send to a previously-unseen peer.
+  @Test(expectedExceptions = IllegalArgumentException.class)
+  public void testConstructorRejectsLowWatermarkAboveHighWatermark() {
+    new ChannelManager(null, 4_000_000, Duration.ofDays(365),
+        32 * 1024 * 1024,  // high
+        64 * 1024 * 1024); // low > high
+  }
 }
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServerValidationTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServerValidationTest.java
new file mode 100644
index 00000000000..307b2da343a
--- /dev/null
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/GrpcMailboxServerValidationTest.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox.channel;
+
+import java.util.Map;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants.MultiStageQueryRunner;
+import org.testng.annotations.Test;
+
+
+/// Negative-path tests for the fail-fast startup gates added to {@link 
GrpcMailboxServer} in commit
+/// 1d29438dc0 ("Fail fast on invalid gRPC mailbox transport configuration"). 
Each test wires the
+/// corresponding bad configuration value through a {@link MailboxService}; 
the underlying
+/// {@code Preconditions.checkArgument} fires inside the {@code 
GrpcMailboxServer} constructor that
+/// {@link MailboxService#start()} invokes, so the failure surfaces during 
{@code start()} — which is
+/// the whole point of fail-at-startup.
+public class GrpcMailboxServerValidationTest {
+
+  /// Pins the gate {@code _inboundMessageCredit > 0} in
+  /// {@link GrpcMailboxServer}: a zero credit value makes the manual inbound 
flow-control prefetch a no-op
+  /// (the server never calls {@code request(...)} with a positive amount) so 
the stream would stall
+  /// immediately. The startup gate ensures this misconfiguration is rejected 
at boot rather than producing
+  /// silent hangs on the first query.
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = ".*inbound.message.credit.*must be 
positive.*")
+  public void testStartRejectsZeroInboundMessageCredit() {
+    PinotConfiguration config = new PinotConfiguration(Map.of(
+        MultiStageQueryRunner.KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT, 0));
+    MailboxService mailboxService = new MailboxService(
+        "localhost", QueryTestUtils.getAvailablePort(), InstanceType.BROKER, 
config);
+    // start() builds the GrpcMailboxServer; the precondition fires inside 
that constructor and propagates
+    // out — no shutdown needed because the server never finished starting.
+    mailboxService.start();
+  }
+
+  /// Pins the gate {@code _flowControlWindowBytes >= maxInboundMessageSize} in
+  /// {@link GrpcMailboxServer}: a window smaller than the largest possible 
single message produces a
+  /// pathological stream where {@code isReady()} flaps on every message 
because no single message can
+  /// ever fit in the available credit. Verifies that a 1 KiB flow-control 
window combined with the
+  /// default 16 MiB max inbound message size is rejected at startup.
+  @Test(expectedExceptions = IllegalArgumentException.class,
+      expectedExceptionsMessageRegExp = ".*flow.control.window.*must be 
>=.*max.msg.size.*")
+  public void testStartRejectsFlowControlWindowSmallerThanMaxMessageSize() {
+    PinotConfiguration config = new PinotConfiguration(Map.of(
+        MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 1024));
+    // Leave KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES at its default (16 
MiB) so the gate is
+    // exercised purely by the too-small flow-control window.
+    MailboxService mailboxService = new MailboxService(
+        "localhost", QueryTestUtils.getAvailablePort(), InstanceType.BROKER, 
config);
+    mailboxService.start();
+  }
+}
diff --git 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
index 7414195c63f..866e2592fc2 100644
--- 
a/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
+++ 
b/pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/channel/MailboxContentObserverTest.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.query.mailbox.channel;
 
 import com.google.protobuf.ByteString;
-import io.grpc.stub.StreamObserver;
+import io.grpc.stub.ServerCallStreamObserver;
 import org.apache.pinot.common.proto.Mailbox.MailboxContent;
 import org.apache.pinot.common.proto.Mailbox.MailboxStatus;
 import org.apache.pinot.query.mailbox.MailboxService;
@@ -38,8 +38,10 @@ public class MailboxContentObserverTest {
     MailboxService mailboxService = mock(MailboxService.class);
     ReceivingMailbox receivingMailbox = mock(ReceivingMailbox.class);
     
when(mailboxService.getReceivingMailbox(TEST_MAILBOX_ID)).thenReturn(receivingMailbox);
-    StreamObserver<MailboxStatus> mockStatusObserver = 
mock(StreamObserver.class);
-    MailboxContentObserver observer = new 
MailboxContentObserver(mailboxService, TEST_MAILBOX_ID, mockStatusObserver);
+    @SuppressWarnings("unchecked")
+    ServerCallStreamObserver<MailboxStatus> mockStatusObserver = 
mock(ServerCallStreamObserver.class);
+    MailboxContentObserver observer =
+        new MailboxContentObserver(mailboxService, TEST_MAILBOX_ID, 
mockStatusObserver, true);
     verify(mailboxService, times(1)).getReceivingMailbox(TEST_MAILBOX_ID);
 
     // Now simulate receiving a mailbox content message
@@ -52,4 +54,56 @@ public class MailboxContentObserverTest {
     verify(mailboxService, times(1)).getReceivingMailbox(TEST_MAILBOX_ID);
     verifyNoMoreInteractions(mailboxService);
   }
+
+  @Test
+  public void testOnNextReplenishesInboundCredit() {
+    MailboxService mailboxService = mock(MailboxService.class);
+    ReceivingMailbox receivingMailbox = mock(ReceivingMailbox.class);
+    
when(mailboxService.getReceivingMailbox(TEST_MAILBOX_ID)).thenReturn(receivingMailbox);
+    @SuppressWarnings("unchecked")
+    ServerCallStreamObserver<MailboxStatus> mockStatusObserver = 
mock(ServerCallStreamObserver.class);
+    MailboxContentObserver observer =
+        new MailboxContentObserver(mailboxService, TEST_MAILBOX_ID, 
mockStatusObserver, true);
+
+    // Each onNext call should request(1) exactly once, regardless of whether 
the stream is already closed.
+    MailboxContent mockContent = mock(MailboxContent.class);
+    when(mockContent.getPayload()).thenReturn(ByteString.copyFrom(new 
byte[0]));
+    when(mockContent.getMailboxId()).thenReturn(TEST_MAILBOX_ID);
+
+    observer.onNext(mockContent);
+    verify(mockStatusObserver, times(1)).request(1);
+
+    observer.onNext(mockContent);
+    verify(mockStatusObserver, times(2)).request(1);
+  }
+
+  /// Asserts the manual-inbound-flow-control kill-switch
+  /// (`pinot.query.runner.grpc.manual.inbound.flow.control.enabled` = 
`false`) really removes the
+  /// `request(1)` replenishment from the top of `onNext`. With the flag off, 
gRPC's auto-inbound is in
+  /// place and will replenish 1 credit after `onNext` returns; calling 
`request(1)` here too would
+  /// double-count and defeat the rollback.
+  ///
+  /// If someone removes the `if (_manualInboundFlowControlEnabled)` guard, 
this test fails and surfaces
+  /// the regression in CI.
+  @Test
+  public void testOnNextDoesNotReplenishCreditWhenManualFlowControlDisabled() {
+    MailboxService mailboxService = mock(MailboxService.class);
+    ReceivingMailbox receivingMailbox = mock(ReceivingMailbox.class);
+    
when(mailboxService.getReceivingMailbox(TEST_MAILBOX_ID)).thenReturn(receivingMailbox);
+    @SuppressWarnings("unchecked")
+    ServerCallStreamObserver<MailboxStatus> mockStatusObserver = 
mock(ServerCallStreamObserver.class);
+    MailboxContentObserver observer =
+        new MailboxContentObserver(mailboxService, TEST_MAILBOX_ID, 
mockStatusObserver, false);
+
+    MailboxContent mockContent = mock(MailboxContent.class);
+    when(mockContent.getPayload()).thenReturn(ByteString.copyFrom(new 
byte[0]));
+    when(mockContent.getMailboxId()).thenReturn(TEST_MAILBOX_ID);
+
+    observer.onNext(mockContent);
+    observer.onNext(mockContent);
+
+    // With manual flow control disabled, onNext must NOT call request(1) — 
gRPC's auto-inbound machinery
+    // handles credit replenishment after onNext returns.
+    verify(mockStatusObserver, never()).request(anyInt());
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 5db5055117e..840dd97db51 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -2335,6 +2335,139 @@ public class CommonConstants {
     public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES 
= "pinot.query.runner.max.msg.size.bytes";
     public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 
16 * 1024 * 1024;
 
+    /// Whether the sender side of every `GrpcSendingMailbox` respects gRPC 
client-side flow control by waiting
+    /// on [io.grpc.stub.ClientCallStreamObserver#isReady] before pushing each 
chunk.
+    ///
+    /// Default `false` — the gate is **opt-in**. When `false`, the sender 
pushes unconditionally and the
+    /// behaviour is identical to the pre-PR-#18519 unbounded path. Set to 
`true` to engage the
+    /// `isReady()`-gated wait that bounds the gRPC client allocator against 
the `OutOfDirectMemoryError`
+    /// failure mode described in #18519. Operators who hit that OOM (slow 
consumer / large fan-out / skewed
+    /// shuffle) should flip this on.
+    ///
+    /// Also used as an A/B knob for benchmarks (see 
`BenchmarkGrpcMailboxSend`).
+    public static final String KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED =
+        "pinot.query.runner.grpc.sender.backpressure.enabled";
+    public static final boolean DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED = 
false;
+
+    /// Per-stream HTTP/2 flow control window, in bytes. The receiver 
advertises this value to the sender as
+    /// the number of bytes it will accept before requiring a `WINDOW_UPDATE` 
frame. Wider windows let the
+    /// sender push a whole `MseBlock` without 
[io.grpc.stub.ClientCallStreamObserver#isReady] flipping
+    /// mid-block. Applied via `NettyServerBuilder.flowControlWindow` in 
`GrpcMailboxServer`.
+    ///
+    /// This is per HTTP/2 stream, so total inbound buffering at the receiver 
scales as
+    /// `value × #concurrent streams to this server`. Concretely:
+    /// `Peak receiver direct memory ≈ flowControlWindow × 
#concurrent_incoming_streams.`
+    ///
+    /// This value is the **per-stalled-stream receiver-side direct-memory 
exposure**, not just a throughput
+    /// knob: when an inbound stream's receiver application queue stalls (e.g. 
the downstream operator is slow
+    /// to drain via 
[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]), the 
wire can
+    /// still buffer up to `flowControlWindow` bytes of data on that stream 
before the HTTP/2 peer stops
+    /// sending.
+    ///
+    /// This is a direct-memory bound, not just a throughput knob: operators 
must size it against
+    /// `-XX:MaxDirectMemorySize` given the expected concurrent inbound stream 
count.
+    ///
+    /// Receiver-side counterpart to 
[#KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES] (the sender-side
+    /// outbound queue cap). The two are aligned at the same default by design 
— they cap roughly the
+    /// same conceptual thing (one peer's worth of in-flight bytes) from the 
two ends of the wire — but
+    /// kept as separate keys so operators can tune them independently for 
asymmetric workloads.
+    public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
+        "pinot.query.runner.grpc.flow.control.window.bytes";
+    public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024 
* 1024;
+
+    /// Netty per-channel WriteQueue high watermark, in bytes. Applied via
+    /// `ChannelOption.WRITE_BUFFER_WATER_MARK` on the sender's 
`NettyChannelBuilder`. When the channel's
+    /// outbound queue exceeds this value, `Channel.isWritable()` flips to 
`false` and gRPC's
+    /// [io.grpc.stub.ClientCallStreamObserver#isReady] returns `false` until 
the queue drops below the low
+    /// watermark.
+    ///
+    /// This is a per-channel (per `host:port`) setting, shared across all 
streams to that peer. The
+    /// sender's direct-memory footprint is therefore bounded by `value × 
#peers`, not by
+    /// `value × #streams`. Concretely:
+    /// `Peak sender direct memory ≈ writeBufferHighWaterMark × #peers (one 
channel per peer, shared across
+    /// streams to that peer).`
+    ///
+    /// This is a direct-memory bound, not just a throughput knob: operators 
must size it against
+    /// `-XX:MaxDirectMemorySize` given the expected per-query peer fan-out 
and the number of concurrent
+    /// queries. Pairs with [#KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES].
+    ///
+    /// Sender-side counterpart to [#KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES] 
(the receiver-side inbound
+    /// window). The two are aligned at the same default by design, sized 
together to bound one peer's
+    /// worth of in-flight bytes from each end of the wire.
+    public static final String KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
+        "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes";
+    public static final int DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES = 
64 * 1024 * 1024;
+
+    /// Netty per-channel WriteQueue low watermark, in bytes. Once the 
WriteQueue has exceeded the high
+    /// watermark (see [#KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES] and 
the
+    /// `writeBufferHighWaterMark × #peers` direct-memory formula documented 
there), it must drop below this
+    /// value before `Channel.isWritable()` flips back to `true`. 
Conventionally set to ~50% of the high
+    /// watermark.
+    ///
+    /// The gap `(high − low)` is the drain hysteresis the channel must clear 
before becoming writable
+    /// again: setting `low` too close to `high` makes the channel flap 
writable/unwritable on every
+    /// small drain; setting it too low forces the sender to wait longer 
between writable windows. The
+    /// low watermark itself does not change the peak direct-memory bound — 
that is set by the high
+    /// watermark — but it controls how aggressively the channel reopens once 
back-pressure has engaged.
+    public static final String KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES =
+        "pinot.query.runner.grpc.write.buffer.low.water.mark.bytes";
+    public static final int DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES = 
32 * 1024 * 1024;
+
+    /// Number of inbound gRPC messages the receiver will accept in flight per 
stream, before requiring the
+    /// application to consume one (via 
[org.apache.pinot.query.mailbox.channel.MailboxContentObserver#onNext]
+    /// returning). Implemented by disabling gRPC's default 
auto-inbound-flow-control on the server side and
+    /// calling [io.grpc.stub.ServerCallStreamObserver#request] explicitly. 
Only takes effect when
+    /// [#KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED] is `true` (off by 
default).
+    ///
+    /// Default `1`, which mirrors gRPC's auto-inbound-flow-control behaviour 
(one message in flight). Even
+    /// when [#KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED] is flipped on, 
this conservative default
+    /// keeps the in-flight window at one message until the operator 
explicitly widens it.
+    ///
+    /// Larger values let the sender pipeline more messages without waiting 
for per-message round trips,
+    /// which is the primary throughput knob for small / medium MSE blocks. 
Memory exposure on the receiver
+    /// is still bounded by the HTTP/2 stream window (see 
[#KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES]), so this
+    /// credit count is effectively a per-stream message-count limit on top of 
the byte-count limit.
+    /// Whichever fires first applies.
+    ///
+    /// ## Cancel-propagation tradeoff
+    ///
+    /// Higher credit values widen the in-flight window, which improves 
throughput for small/medium blocks
+    /// but also **widens worst-case cancel-propagation latency** when the 
receiver's application queue
+    /// (capacity 5 by default) is stuck. The sender's
+    /// [org.apache.pinot.query.mailbox.GrpcSendingMailbox#cancel] pushes an 
error EOS **in-band** on the
+    /// same gRPC stream as data; when the receiver's dispatch thread is 
parked in `_notFull.await`, that
+    /// EOS sits behind every inbound message that already made it past flow 
control. Worst-case cancel
+    /// latency is bounded by `min(credit messages, flowControlWindow bytes)` 
worth of buffered inbound that
+    /// has to drain before the EOS reaches the application.
+    ///
+    /// Note that this hang surface is **pre-existing** — the in-band EOS path 
can stall even with gRPC's
+    /// auto-inbound default of 1 in-flight message if the receiver's 
application queue is permanently
+    /// stuck (e.g. the consumer is gone). The credit value just controls how 
much worse the latency gets
+    /// before the hang surfaces. See 
https://github.com/apache/pinot/issues/18541 for the proper
+    /// out-of-band cancel work.
+    public static final String KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT =
+        "pinot.query.runner.grpc.inbound.message.credit";
+    public static final int DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT = 1;
+
+    /// Whether the receiver overrides gRPC's auto-inbound-flow-control on the 
mailbox stream and prefetches
+    /// [#KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT] messages of credit up-front, 
then replenishes one credit
+    /// before each `onNext` does the (possibly blocking) hand-off to the 
application queue.
+    ///
+    /// Default `false` — the manual-flow-control path is **opt-in**. When 
`false` (default), the receiver
+    /// leaves gRPC's auto-inbound in place (only 1 message in flight at a 
time, post-`onNext`-return credit
+    /// replenishment), which is the pre-PR-#18519 behaviour. Set to `true` to 
engage the manual prefetch +
+    /// pre-`offerRaw` credit replenishment introduced in #18519, which is the 
primary throughput knob for
+    /// small/medium MSE blocks.
+    ///
+    /// Cancel-propagation latency is bounded more tightly when this is 
`false`, but the worst case (a stuck
+    /// receiver dispatch thread) is still possible because the sender's 
cancel travels in-band; see
+    /// https://github.com/apache/pinot/issues/18541.
+    ///
+    /// This is an independent opt-in from 
[#KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED]; the two control
+    /// different sides of the mailbox path.
+    public static final String KEY_OF_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED 
=
+        "pinot.query.runner.grpc.manual.inbound.flow.control.enabled";
+    public static final boolean 
DEFAULT_GRPC_MANUAL_INBOUND_FLOW_CONTROL_ENABLED = false;
 
     /**
      * Configuration for channel idle timeout in seconds.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to