yashmayya commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3262636007


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -102,7 +128,13 @@ public void send(MseBlock.Data data) {
 
   @Override
   public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
-    if (sendInternal(block, serializedStats)) {
+    // EOS blocks (success or error) carry control-plane info — including the 
original error code on the error path —
+    // and must always reach the receiver, so they bypass the back-pressure 
gate. Bypassing also disables the
+    // cooperative termination poll inside [#awaitReady]; without it, a 
terminate signal raised while the sender is
+    // mid-way through pushing an error EOS would unwind [#sendInternal] with 
a TerminationException, leave
+    // [#_senderSideClosed] false, and let [#cancel] run and overwrite the 
original error code with
+    // QUERY_CANCELLATION on the receiver side.
+    if (sendInternal(block, serializedStats, /* bypassReady */ true)) {
       LOGGER.debug("Completing mailbox: {}", _id);
       _contentObserver.onCompleted();

Review Comment:
   On the success path, `_contentObserver.onCompleted()` runs before 
`_senderSideClosed = true`. If a concurrent `cancel()` (from an OpChain 
on-failure callback / external watchdog) arrives in that window, 
`isTerminated()` still returns false, so cancel proceeds to push its own error 
EOS via `_contentObserver.onNext(...)` *after* `onCompleted()` — gRPC then 
throws `IllegalStateException`. Swapping the two lines is a one-line fix.
   
   Separately: after `awaitReady` returns true in `sendContent` (line 320) 
there is no `isTerminated()` re-check before `_contentObserver.onNext(content)` 
at line 330, so the data-block sender can still race a concurrent cancel that 
calls `onNext`/`onCompleted` on the same (non-thread-safe) 
`ClientCallStreamObserver`. Consider serializing the terminal calls under 
`_readyLock`, or re-checking `_senderSideClosed` after `awaitReady`.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -208,17 +263,65 @@ public boolean isTerminated() {
     return _senderSideClosed || _statusObserver.isFinished();
   }
 
-  private StreamObserver<MailboxContent> getContentObserver() {
+  private ClientCallStreamObserver<MailboxContent> getContentObserver() {
     Metadata metadata = new Metadata();
     metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
 
-    return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname, 
_port))
+    // We wrap `_statusObserver` in a ClientResponseObserver so we can 
register the on-ready handler through
+    // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after 
open() returns. Wrapping (rather than
+    // making MailboxStatusObserver itself a ClientResponseObserver) keeps the 
back-pressure plumbing local to this
+    // class. The wrapper delegates the data callbacks unchanged, and signals 
our `_readyCond` on stream close so a
+    // blocked sender wakes up to observe `_statusObserver.isFinished()` 
becoming true.
+    ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+        new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+          @Override
+          public void beforeStart(ClientCallStreamObserver<MailboxContent> 
requestStream) {
+            // Fires on a gRPC channel/Netty thread whenever isReady() 
transitions false -> true. Just signal; the
+            // sender re-checks the predicate after waking.
+            
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+          }
+
+          @Override
+          public void onNext(MailboxStatus value) {
+            _statusObserver.onNext(value);

Review Comment:
   The wrapper calls `wakeWaiters()` from `onError`/`onCompleted` but not from 
`onNext`. When the receiver requests early termination 
(`MAILBOX_METADATA_REQUEST_EARLY_TERMINATE`), it does *not* close the stream — 
`MailboxContentObserver` only sets `WAITING_EOS` and keeps the stream open for 
the sender's EOS. A sender blocked in `awaitReady` on a congested stream 
therefore won't observe the early-terminate flip until either the stream closes 
(it won't, in this case) or the deadline elapses, because `awaitReady` does not 
check `isEarlyTerminated()`. Calling `wakeWaiters()` here (after delegating to 
`_statusObserver.onNext`) lets the sender wake and re-check terminal predicates 
promptly.



##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java:
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/// A/B benchmark for the gRPC sender-side back-pressure gate in 
`GrpcSendingMailbox`.
+///
+/// Two real `MailboxService` instances run on localhost; a background drainer 
thread polls the
+/// receiver as fast as it can. Crucially, the **drainer is rate-limited** 
(`@Param drainSleepMicros`)

Review Comment:
   The Javadoc advertises a `@Param drainSleepMicros` that throttles the 
drainer, but no such field exists — the drainer (lines 130–139) spin-polls with 
`Thread.onSpinWait()`. At small payloads the drainer keeps `isReady() == true` 
permanently, so the A/B between `backpressureEnabled=true/false` measures only 
one extra volatile read on the fast path; the slow-path park/wake the docstring 
claims to exercise is never hit. Either add `@Param public int 
_drainSleepMicros;` and `LockSupport.parkNanos(_drainSleepMicros * 1000L)` in 
the drainer, or correct the docstring.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java:
##########
@@ -122,4 +123,14 @@ public boolean resetConnectBackoff(String hostname, int 
port) {
   private NettyChannelBuilder decorate(NettyChannelBuilder builder) {
     return builder.idleTimeout(_idleTimeout.getSeconds(), TimeUnit.SECONDS);
   }
+
+  /// Exposes the metric view of the shared gRPC client allocator. The returned
+  /// metric covers every channel managed by this instance and reports both
+  /// `usedDirectMemory()` and `usedHeapMemory()`, so it remains meaningful
+  /// regardless of whether Netty is configured to prefer direct or heap 
buffers
+  /// (e.g. `-Dio.netty.noPreferDirect=true`). Consumed by [MailboxService] to
+  /// register the `MAILBOX_CLIENT_USED_*` gauges.
+  public PooledByteBufAllocatorMetric getBufAllocatorMetric() {

Review Comment:
   Returning 
`io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric` from a 
public method exposes a shaded internal type as part of `ChannelManager`'s (and 
symmetrically `GrpcMailboxServer.getBufAllocatorMetric()` at line 174) public 
API. The only caller is `MailboxService`, which immediately reduces it to 
`usedDirectMemory()` / `usedHeapMemory()` longs. Existing precedents in 
`GrpcQueryServer` / `BrokerGrpcServer` keep the shaded metric type confined to 
the class that owns the allocator. Consider replacing with `long 
usedDirectMemoryBytes()` / `long usedHeapMemoryBytes()` getters here and on 
`GrpcMailboxServer`.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -190,6 +236,37 @@ public boolean resetConnectBackoff(String hostname, int 
port) {
     return _channelManager.resetConnectBackoff(hostname, port);
   }
 
+  /// Current value of [BrokerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] /
+  /// [ServerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] — bytes pinned by the
+  /// shared gRPC client allocator backing every [GrpcSendingMailbox] created
+  /// from this service.
+  public long getMailboxClientUsedDirectMemoryBytes() {

Review Comment:
   These four accessors (lines 243–267) are used only by the new repro test and 
the JMH benchmark, but they're declared `public` on `MailboxService` — 
permanently widening its surface and duplicating what the registered gauges 
already expose. Annotating with `@VisibleForTesting` is the convention used 
elsewhere in Pinot for this pattern; alternatively, moving them onto 
`ChannelManager` / `GrpcMailboxServer` as package-private would keep the test 
access local.



##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureReproTest.java:
##########
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates the sender-side gRPC back-pressure described in 
`grpc-oom-analysis.md`.
+///
+/// A "fast sender" tries to push the same small data block over and over on 
the
+/// test thread while a "slow reader" thread polls the receiving mailbox at
+/// roughly 50 blocks per second. With back-pressure in place, the sender 
thread
+/// blocks inside [GrpcSendingMailbox.awaitReady] whenever the gRPC outbound
+/// queue fills, so the send rate tracks the polling rate plus a bounded
+/// in-flight pipeline (gRPC HTTP/2 stream window + Netty WriteQueue + the
+/// receiver's bounded mailbox queue).
+///
+/// We assert two complementary properties:
+///  1. `sendCount` is bounded by `polledCount` plus a generous in-flight
+///     allowance — pre-fix the ratio was ~1700x, which would still fail this
+///     check by orders of magnitude.
+///  2. The peak growth of the sender's client allocator stays under a small
+///     constant — pre-fix it reached 50+ MB in 3 s, post-fix we expect at most
+///     one or two Netty pool chunks.
+///
+/// The thresholds are intentionally generous; tightening them would require
+/// per-channel Netty watermark tuning, which is deferred to a follow-up.
+public class GrpcSenderBackpressureReproTest {

Review Comment:
   The new kill-switch (`pinot.query.runner.grpc.sender.backpressure.enabled`) 
is the production rollback for this PR but has no CI coverage — the JMH bench 
covers both values but JMH does not run in CI. A refactor that broke the 
`bypassReady || !_backpressureEnabled` short-circuit at 
`GrpcSendingMailbox.java:351` would pass. Worth adding a small companion test 
that builds a `MailboxService` with the flag off and asserts the gate is 
verifiably bypassed (e.g. sender pace far exceeds the slow reader, mirroring 
the pre-fix repro).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to