yashmayya commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3262636007
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -102,7 +128,13 @@ public void send(MseBlock.Data data) {
@Override
public void send(MseBlock.Eos block, List<DataBuffer> serializedStats) {
- if (sendInternal(block, serializedStats)) {
+ // EOS blocks (success or error) carry control-plane info — including the
original error code on the error path —
+ // and must always reach the receiver, so they bypass the back-pressure
gate. Bypassing also disables the
+ // cooperative termination poll inside [#awaitReady]; without it, a
terminate signal raised while the sender is
+ // mid-way through pushing an error EOS would unwind [#sendInternal] with
a TerminationException, leave
+ // [#_senderSideClosed] false, and let [#cancel] run and overwrite the
original error code with
+ // QUERY_CANCELLATION on the receiver side.
+ if (sendInternal(block, serializedStats, /* bypassReady */ true)) {
LOGGER.debug("Completing mailbox: {}", _id);
_contentObserver.onCompleted();
Review Comment:
On the success path, `_contentObserver.onCompleted()` runs before
`_senderSideClosed = true`. If a concurrent `cancel()` (from an OpChain
on-failure callback / external watchdog) arrives in that window,
`isTerminated()` still returns false, so cancel proceeds to push its own error
EOS via `_contentObserver.onNext(...)` *after* `onCompleted()` — gRPC then
throws `IllegalStateException`. Swapping the two lines is a one-line fix.
Separately: after `awaitReady` returns true in `sendContent` (line 320)
there is no `isTerminated()` re-check before `_contentObserver.onNext(content)`
at line 330, so the data-block sender can still race a concurrent cancel that
calls `onNext`/`onCompleted` on the same (non-thread-safe)
`ClientCallStreamObserver`. Consider serializing the terminal calls under
`_readyLock`, or re-checking `_senderSideClosed` after `awaitReady`.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -208,17 +263,65 @@ public boolean isTerminated() {
return _senderSideClosed || _statusObserver.isFinished();
}
- private StreamObserver<MailboxContent> getContentObserver() {
+ private ClientCallStreamObserver<MailboxContent> getContentObserver() {
Metadata metadata = new Metadata();
metadata.put(ChannelUtils.MAILBOX_ID_METADATA_KEY, _id);
- return PinotMailboxGrpc.newStub(_channelManager.getChannel(_hostname,
_port))
+ // We wrap `_statusObserver` in a ClientResponseObserver so we can
register the on-ready handler through
+ // `beforeStart` — gRPC rejects setOnReadyHandler() if it is called after
open() returns. Wrapping (rather than
+ // making MailboxStatusObserver itself a ClientResponseObserver) keeps the
back-pressure plumbing local to this
+ // class. The wrapper delegates the data callbacks unchanged, and signals
our `_readyCond` on stream close so a
+ // blocked sender wakes up to observe `_statusObserver.isFinished()`
becoming true.
+ ClientResponseObserver<MailboxContent, MailboxStatus> responseObserver =
+ new ClientResponseObserver<MailboxContent, MailboxStatus>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<MailboxContent>
requestStream) {
+ // Fires on a gRPC channel/Netty thread whenever isReady()
transitions false -> true. Just signal; the
+ // sender re-checks the predicate after waking.
+
requestStream.setOnReadyHandler(GrpcSendingMailbox.this::wakeWaiters);
+ }
+
+ @Override
+ public void onNext(MailboxStatus value) {
+ _statusObserver.onNext(value);
Review Comment:
The wrapper calls `wakeWaiters()` from `onError`/`onCompleted` but not from
`onNext`. When the receiver requests early termination
(`MAILBOX_METADATA_REQUEST_EARLY_TERMINATE`), it does *not* close the stream —
`MailboxContentObserver` only sets `WAITING_EOS` and keeps the stream open for
the sender's EOS. A sender blocked in `awaitReady` on a congested stream
therefore won't observe the early-terminate flip until either the stream closes
(it won't, in this case) or the deadline elapses, because `awaitReady` does not
check `isEarlyTerminated()`. Calling `wakeWaiters()` here (after delegating to
`_statusObserver.onNext`) lets the sender wake and re-check terminal predicates
promptly.
##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java:
##########
@@ -0,0 +1,199 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/// A/B benchmark for the gRPC sender-side back-pressure gate in
`GrpcSendingMailbox`.
+///
+/// Two real `MailboxService` instances run on localhost; a background drainer
thread polls the
+/// receiver as fast as it can. Crucially, the **drainer is rate-limited**
(`@Param drainSleepMicros`)
Review Comment:
The Javadoc advertises a `@Param drainSleepMicros` that throttles the
drainer, but no such field exists — the drainer (lines 130–139) spin-polls with
`Thread.onSpinWait()`. At small payloads the drainer keeps `isReady() == true`
permanently, so the A/B between `backpressureEnabled=true/false` measures only
one extra volatile read on the fast path; the slow-path park/wake the docstring
claims to exercise is never hit. Either add `@Param public int
_drainSleepMicros;` and `LockSupport.parkNanos(_drainSleepMicros * 1000L)` in
the drainer, or correct the docstring.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/channel/ChannelManager.java:
##########
@@ -122,4 +123,14 @@ public boolean resetConnectBackoff(String hostname, int
port) {
private NettyChannelBuilder decorate(NettyChannelBuilder builder) {
return builder.idleTimeout(_idleTimeout.getSeconds(), TimeUnit.SECONDS);
}
+
+ /// Exposes the metric view of the shared gRPC client allocator. The returned
+ /// metric covers every channel managed by this instance and reports both
+ /// `usedDirectMemory()` and `usedHeapMemory()`, so it remains meaningful
+ /// regardless of whether Netty is configured to prefer direct or heap
buffers
+ /// (e.g. `-Dio.netty.noPreferDirect=true`). Consumed by [MailboxService] to
+ /// register the `MAILBOX_CLIENT_USED_*` gauges.
+ public PooledByteBufAllocatorMetric getBufAllocatorMetric() {
Review Comment:
Returning
`io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocatorMetric` from a
public method exposes a shaded internal type as part of `ChannelManager`'s (and
symmetrically `GrpcMailboxServer.getBufAllocatorMetric()` at line 174) public
API. The only caller is `MailboxService`, which immediately reduces it to
`usedDirectMemory()` / `usedHeapMemory()` longs. Existing precedents in
`GrpcQueryServer` / `BrokerGrpcServer` keep the shaded metric type confined to
the class that owns the allocator. Consider replacing with `long
usedDirectMemoryBytes()` / `long usedHeapMemoryBytes()` getters here and on
`GrpcMailboxServer`.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/MailboxService.java:
##########
@@ -190,6 +236,37 @@ public boolean resetConnectBackoff(String hostname, int
port) {
return _channelManager.resetConnectBackoff(hostname, port);
}
+ /// Current value of [BrokerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] /
+ /// [ServerGauge#MAILBOX_CLIENT_USED_DIRECT_MEMORY] — bytes pinned by the
+ /// shared gRPC client allocator backing every [GrpcSendingMailbox] created
+ /// from this service.
+ public long getMailboxClientUsedDirectMemoryBytes() {
Review Comment:
These four accessors (lines 243–267) are used only by the new repro test and
the JMH benchmark, but they're declared `public` on `MailboxService` —
permanently widening its surface and duplicating what the registered gauges
already expose. Annotating with `@VisibleForTesting` is the convention used
elsewhere in Pinot for this pattern; alternatively, moving them onto
`ChannelManager` / `GrpcMailboxServer` as package-private would keep the test
access local.
##########
pinot-query-runtime/src/test/java/org/apache/pinot/query/mailbox/GrpcSenderBackpressureReproTest.java:
##########
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.query.mailbox;
+
+import java.util.Locale;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.planner.physical.MailboxIdUtils;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.query.runtime.operator.OperatorTestUtil;
+import org.apache.pinot.query.testutils.QueryTestUtils;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertTrue;
+
+
+/// Validates the sender-side gRPC back-pressure described in
`grpc-oom-analysis.md`.
+///
+/// A "fast sender" tries to push the same small data block over and over on
the
+/// test thread while a "slow reader" thread polls the receiving mailbox at
+/// roughly 50 blocks per second. With back-pressure in place, the sender
thread
+/// blocks inside [GrpcSendingMailbox.awaitReady] whenever the gRPC outbound
+/// queue fills, so the send rate tracks the polling rate plus a bounded
+/// in-flight pipeline (gRPC HTTP/2 stream window + Netty WriteQueue + the
+/// receiver's bounded mailbox queue).
+///
+/// We assert two complementary properties:
+/// 1. `sendCount` is bounded by `polledCount` plus a generous in-flight
+/// allowance — pre-fix the ratio was ~1700x, which would still fail this
+/// check by orders of magnitude.
+/// 2. The peak growth of the sender's client allocator stays under a small
+/// constant — pre-fix it reached 50+ MB in 3 s, post-fix we expect at most
+/// one or two Netty pool chunks.
+///
+/// The thresholds are intentionally generous; tightening them would require
+/// per-channel Netty watermark tuning, which is deferred to a follow-up.
+public class GrpcSenderBackpressureReproTest {
Review Comment:
The new kill-switch (`pinot.query.runner.grpc.sender.backpressure.enabled`)
is the production rollback for this PR but has no CI coverage — the JMH bench
covers both values but JMH does not run in CI. A refactor that broke the
`bypassReady || !_backpressureEnabled` short-circuit at
`GrpcSendingMailbox.java:351` would pass. Worth adding a small companion test
that builds a `MailboxService` with the flag off and asserts the gate is
verifiably bypassed (e.g. sender pace far exceeds the slow reader, mirroring
the pre-fix repro).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]