gortiz opened a new pull request, #18519:
URL: https://github.com/apache/pinot/pull/18519

   > **Draft.** This PR ships only the observability + reproducer that we need 
to discuss the fix. The actual sender-side flow-control change is intentionally 
**not** in this commit — see _What's missing_ below. Do not merge in this state.
   
   ## Why
   
   Customer hit `OutOfDirectMemoryError` inside `MessageFramer.writeRaw` from 
`GrpcSendingMailbox.sendContent`:
   
   ```
   failed to allocate 4194304 byte(s) of direct memory (used: 25163727127, max: 
25165824000)
     at 
io.grpc.netty.shaded.io.netty.util.internal.PlatformDependent.incrementMemoryCounter(...)
     ...
     at io.grpc.internal.MessageFramer.writeRaw(MessageFramer.java:294)
     ...
     at 
org.apache.pinot.query.mailbox.GrpcSendingMailbox.sendContent(GrpcSendingMailbox.java:227)
   ```
   
   Root cause: `GrpcSendingMailbox` calls `StreamObserver.onNext(...)` 
unconditionally on every chunk. The runtime object is a 
`ClientCallStreamObserver` exposing `isReady()` / `setOnReadyHandler()`, but we 
don't use either. The receiver-side `MailboxStatusObserver` already writes back 
buffer-size metadata to the sender, but it is explicitly thrown away:
   
   ```java
   // pinot-query-runtime/.../channel/MailboxStatusObserver.java:53
   // TODO: this feedback info is not used to throttle the send speed. it is 
currently being discarded.
   ```
   
   When the receiver drains slower than the sender writes (slow consumer, full 
mailbox queue, large fan-out, skewed hash shuffle, etc.) the proto chunks pile 
up in gRPC Netty's outbound queue until the JVM-wide direct-memory cap is hit.
   
   ## What this PR adds
   
   ### New gauges (`MAILBOX_CLIENT_USED_*`)
   
   `BrokerGauge` and `ServerGauge` gain two entries each:
   
   | Gauge | Source |
   |---|---|
   | `MAILBOX_CLIENT_USED_DIRECT_MEMORY` | 
`ChannelManager._bufAllocator.metric().usedDirectMemory()` |
   | `MAILBOX_CLIENT_USED_HEAP_MEMORY`   | 
`ChannelManager._bufAllocator.metric().usedHeapMemory()` |
   
   These mirror the existing `MAILBOX_SERVER_USED_*` gauges that 
`GrpcMailboxServer` already publishes for the inbound (server) allocator — 
closing the gap that today we have **no per-process visibility into the gRPC 
client outbound pool**, which is exactly the pool that exhausts in this failure 
mode.
   
   `MailboxService` registers them in its constructor, picking `BrokerMetrics` 
vs `ServerMetrics` based on `InstanceType`, mirroring `GrpcMailboxServer`'s 
existing pattern. Both direct and heap are reported so the numbers remain 
meaningful regardless of `-Dio.netty.noPreferDirect`.
   
   `MailboxService` also exposes the same values via four accessors 
(`getMailbox{Client,Server}Used{Direct,Heap}MemoryBytes()`) so tests and other 
internal callers can read the gauge values without going through the metrics 
registry.
   
   ### `GrpcSenderBackpressureReproTest`
   
   A new TestNG test in `pinot-query-runtime/src/test/.../mailbox/` that:
   
   * Stands up two real `MailboxService` instances on localhost (sender side + 
receiver side, full gRPC stack — not mocked).
   * Spawns a **slow reader** thread polling the receiving mailbox every 20 ms 
(~50 blocks/s).
   * Runs a **fast sender** loop on the test thread: `sender.send(block)` in a 
tight loop with the same small `RowHeapDataBlock` reused over and over. No 
large blocks — just a lot of them.
   * After 3 seconds, asserts the sender pushed at least 10x more blocks than 
the receiver polled (in practice the ratio is ~1700x on a developer laptop).
   * Prints both client and server pool memory through the new gauges so the 
failure mode is visible in CI output.
   
   Representative output from a local run:
   
   ```
   [GrpcSenderBackpressureReproTest] sent=197022 polled=106 ratio=1858.7x
     sender   MAILBOX_CLIENT_USED_*: direct=54525952B heap=0B (peak 
growth=54525952B)
     receiver MAILBOX_SERVER_USED_*: direct=8388608B  heap=0B (peak 
growth=8388608B)
   ```
   
   The test's purpose for now is to make the bug **demonstrable** and 
**measurable**, not to gate merges on memory thresholds (those are flaky across 
hosts).
   
   ## What's missing (and intentionally so)
   
   The actual sender-side backpressure mechanism. Once we agree on the approach 
— most likely a `ClientCallStreamObserver` cast in 
`GrpcSendingMailbox.getContentObserver()` plus an `isReady()`-gated wait in 
`sendContent()`, optionally combined with the receiver buffer-size feedback 
that's already being sent — this PR will be updated to include:
   
   * The flow-control change in `GrpcSendingMailbox`.
   * An inversion of the assertion in `GrpcSenderBackpressureReproTest`: after 
the fix the sender's send count should stay close to the receiver's polled 
count plus a small in-flight constant. The current "outpaces by ≥10x" assertion 
is left as a TODO comment in the test for that reason.
   
   ## Design questions to discuss on this PR
   
   1. **Where to apply the wait.** `setOnReadyHandler` + `Condition` on the 
sending thread vs. busy-checking `isReady()` per chunk. The sender currently 
runs on the query-runner thread; a blocking wait there is acceptable because 
the OpChain is already structured around blocking sends.
   2. **Whether to honor the receiver's `buffer_size` feedback** in addition to 
gRPC `isReady()`. The receiver knows about the bounded `ReceivingMailbox` queue 
(default 5 blocks); gRPC's `isReady()` only knows about HTTP/2 + Netty queues. 
Either signal can be the bottleneck — we may want both.
   3. **Cap on outbound queued bytes per channel** (`NettyChannelBuilder` 
write-buffer watermark) as a defense-in-depth limit so a single misbehaving 
stream can't blow the global pool.
   
   ## Backwards compatibility
   
   * New gauges: additive only — no rename or removal of existing gauges.
   * New `MailboxService` accessors: additive.
   * `ChannelManager.getBufAllocatorMetric()` is new and `public` (consumed by 
`MailboxService`); `GrpcMailboxServer.getBufAllocatorMetric()` is new and 
`public`.
   * No on-wire protocol change. No config-key change. Rolling upgrades 
unaffected.
   
   ## Test plan
   
   - [x] `GrpcSenderBackpressureReproTest` passes locally.
   - [x] Existing `MailboxServiceTest`, `GrpcSendingMailboxTest` still pass.
   - [ ] CI green on apache/pinot.
   - [ ] Verify `MAILBOX_CLIENT_USED_*` gauges show up under JMX / Prometheus 
on a real broker and server (manual check before un-drafting).
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to