gortiz commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3271914191


##########
pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java:
##########
@@ -2167,6 +2167,73 @@ public static class MultiStageQueryRunner {
     public static final String KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES 
= "pinot.query.runner.max.msg.size.bytes";
     public static final int DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES = 
16 * 1024 * 1024;
 
+    /**
+     * Whether the sender side of every {@code GrpcSendingMailbox} respects 
gRPC client-side flow control by waiting
+     * on {@code ClientCallStreamObserver.isReady()} before pushing each chunk.
+     *
+     * <p>Default {@code true}. Set to {@code false} to restore the pre-1.6 
behaviour where the sender pushes
+     * unconditionally; useful as a production kill-switch if the gate causes 
an unexpected regression, and as an
+     * A/B knob for benchmarks (see {@code BenchmarkGrpcMailboxSend}).
+     *
+     * <p>Disabling this flag is what re-introduces the {@code 
OutOfDirectMemoryError} failure mode the gate exists
+     * to prevent. It is here as a safety valve, not as a recommended setting.
+     */
+    public static final String KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED =
+        "pinot.query.runner.grpc.sender.backpressure.enabled";
+    public static final boolean DEFAULT_GRPC_SENDER_BACKPRESSURE_ENABLED = 
true;
+
+    /**
+     * Per-stream HTTP/2 flow control window, in bytes. The receiver 
advertises this value to the sender as
+     * the number of bytes it will accept before requiring a `WINDOW_UPDATE` 
frame. Wider windows let the
+     * sender push a whole `MseBlock` without {@link 
io.grpc.stub.ClientCallStreamObserver#isReady} flipping
+     * mid-block. Applied via `NettyServerBuilder.flowControlWindow` in 
`GrpcMailboxServer`.
+     *
+     * <p>This is per HTTP/2 stream, so total inbound buffering at the 
receiver scales as
+     * {@code value × #concurrent streams to this server}.
+     */
+    public static final String KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES =
+        "pinot.query.runner.grpc.flow.control.window.bytes";
+    public static final int DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES = 64 * 1024 
* 1024;
+
+    /**
+     * Netty per-channel WriteQueue high watermark, in bytes. Applied via
+     * `ChannelOption.WRITE_BUFFER_WATER_MARK` on the sender's 
`NettyChannelBuilder`. When the channel's
+     * outbound queue exceeds this value, `Channel.isWritable()` flips to 
`false` and gRPC's
+     * `ClientCallStreamObserver.isReady()` returns `false` until the queue 
drops below the low watermark.
+     *
+     * <p>This is a per-channel (per `host:port`) setting, shared across all 
streams to that peer. The
+     * sender's direct-memory footprint is therefore bounded by {@code value × 
#peers}, not by
+     * {@code value × #streams}. Pairs with {@link 
#KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES}.
+     */
+    public static final String KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES =
+        "pinot.query.runner.grpc.write.buffer.high.water.mark.bytes";
+    public static final int DEFAULT_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES = 
64 * 1024 * 1024;
+
+    /**
+     * Netty per-channel WriteQueue low watermark, in bytes. Once the 
WriteQueue has exceeded the high
+     * watermark (see {@link 
#KEY_OF_GRPC_WRITE_BUFFER_HIGH_WATER_MARK_BYTES}), it must drop below this
+     * value before `Channel.isWritable()` flips back to `true`. 
Conventionally set to ~50% of the high
+     * watermark.
+     */
+    public static final String KEY_OF_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES =
+        "pinot.query.runner.grpc.write.buffer.low.water.mark.bytes";
+    public static final int DEFAULT_GRPC_WRITE_BUFFER_LOW_WATER_MARK_BYTES = 
32 * 1024 * 1024;
+
+    /**
+     * Number of inbound gRPC messages the receiver will accept in flight per 
stream, before requiring the
+     * application to consume one (via {@code MailboxContentObserver.onNext} 
returning). Implemented by
+     * disabling gRPC's default auto-inbound-flow-control on the server side 
and calling
+     * {@code ServerCallStreamObserver.request(int)} explicitly.
+     *
+     * <p>Larger values let the sender pipeline more messages without waiting 
for per-message round trips,
+     * which is the primary throughput knob for small / medium MSE blocks. 
Memory exposure on the receiver
+     * is still bounded by the HTTP/2 stream window (see
+     * {@link #KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES}), so this credit count 
is effectively a per-stream
+     * message-count limit on top of the byte-count limit. Whichever fires 
first applies.
+     */
+    public static final String KEY_OF_GRPC_INBOUND_MESSAGE_CREDIT =
+        "pinot.query.runner.grpc.inbound.message.credit";
+    public static final int DEFAULT_GRPC_INBOUND_MESSAGE_CREDIT = 128;

Review Comment:
   Yes, this means exactly that.
   
   The max number of queued up messages will be 128 + the 5 we have in the 
blocking queue, but:
   - The ones in the blocking queue are heap allocated, while the ones in gRPC 
should be off-heap (depending on the gRPC implementation)
   - Their total size in bytes is still limited by the window size 
(`DEFAULT_GRPC_FLOW_CONTROL_WINDOW_BYTES`).
   
   This is useful for small blocks that are sent very rapidly (which is 
probably not a common case in Pinot). Before this change, the number of 
messages in-flight was limited to 1. This wasn't that problematic because we 
buffered indefinitely on the sender side, but with this feature, it blocks the 
sender (the window isn't considered ready).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to