gortiz commented on code in PR #18519:
URL: https://github.com/apache/pinot/pull/18519#discussion_r3288882119


##########
pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java:
##########
@@ -0,0 +1,251 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.perf;
+
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.pinot.common.datatable.StatMap;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.query.mailbox.MailboxService;
+import org.apache.pinot.query.mailbox.ReceivingMailbox;
+import org.apache.pinot.query.mailbox.SendingMailbox;
+import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock;
+import org.apache.pinot.query.runtime.operator.MailboxSendOperator;
+import org.apache.pinot.spi.config.instance.InstanceType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.query.QueryThreadContext;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Fork;
+import org.openjdk.jmh.annotations.Measurement;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.OutputTimeUnit;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.annotations.TearDown;
+import org.openjdk.jmh.annotations.Warmup;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+
+
+/// Benchmark for the gRPC sender → receiver MSE mailbox path.
+///
+/// One @Benchmark invocation sends a fixed total payload (`TOTAL_BYTES = 128 
MiB`) split into
+/// pre-computed `MseBlock.Data` blocks of size `_blockSizeBytes`, and waits 
until every block has
+/// been polled out of the receiver mailbox. Measured time is therefore 
end-to-end "ship a request
+/// of this size and process it on the other side" — closer to the unit Pinot 
actually cares about
+/// than throughput of one-block-at-a-time hot-loop send.
+///
+/// Axes:
+///
+///  * `_blockSizeBytes` — three representative block sizes:
+///      - `8 KiB`   → 16384 blocks per invocation; sender is dominated by 
per-block overhead.
+///      - `8 MiB`   → 16 blocks; one block fits in a single gRPC chunk
+///        (`maxInboundMessageSize / 2 ≈ 8 MiB`).
+///      - `32 MiB`  → 4 blocks; each block is split by `toByteStrings` into 
~4 gRPC chunks.
+///  * `_backpressureEnabled` — toggles the `GrpcSendingMailbox` 
`isReady()`-gate. `false` reverts
+///    to the pre-fix unconditional `onNext` (kill-switch).
+///  * `_flowControlWindowBytes` — HTTP/2 per-stream inbound window the 
receiver advertises
+///    (`pinot.query.runner.grpc.flow.control.window.bytes`). Sweeps `{64 KiB, 
1 MiB, 64 MiB}`,
+///    spanning the historical default, BDP-estimated LAN value, and the new 
MB-scale default.
+///
+/// The drainer runs as a separate thread (modelling the cross-operator 
boundary in a real query),
+/// blocks on a `Semaphore` released by the receiver's `Reader` callback, and 
counts down a
+/// per-invocation `CountDownLatch` for each polled block. The @Benchmark 
thread sends every block
+/// then awaits the latch.
+///
+/// Run with `org.openjdk.jmh.Main 
org.apache.pinot.perf.BenchmarkGrpcMailboxSend`.
+@BenchmarkMode(Mode.AverageTime)
+@OutputTimeUnit(TimeUnit.MILLISECONDS)
+@Fork(value = 1, jvmArgsAppend = {
+    "-Xms2g", "-Xmx4g",
+    "-XX:MaxDirectMemorySize=4g"
+})
+@Warmup(iterations = 2, time = 5)
+@Measurement(iterations = 3, time = 10)
+@State(Scope.Benchmark)
+public class BenchmarkGrpcMailboxSend {
+
+  /// Total bytes shipped per @Benchmark invocation. Sized so that even at 32 
MiB-per-block we
+  /// get a handful of blocks to bound the per-invocation latency.
+  private static final long TOTAL_BYTES = 128L * 1024 * 1024;
+
+  private static final DataSchema SCHEMA = new DataSchema(
+      new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING});
+
+  /// Approximate bytes per `MseBlock.Data` payload. Each block carries a 
single String column,
+  /// one row, where the String is roughly `_blockSizeBytes` characters (ASCII 
→ 1 byte per char
+  /// when serialised on the wire, ~2 bytes on heap). The number of blocks per 
invocation is
+  /// `ceil(TOTAL_BYTES / _blockSizeBytes)`.
+  @Param({"8192", "8388608", "33554432"})
+  public int _blockSizeBytes;
+
+  /// `true` exercises the sender-side `isReady()`-gate; `false` restores the 
pre-fix
+  /// unconditional `onNext` (kill-switch). Wired via
+  /// `pinot.query.runner.grpc.sender.backpressure.enabled`.
+  @Param({"true", "false"})
+  public boolean _backpressureEnabled;
+
+  /// HTTP/2 per-stream flow-control window the receiver advertises, in bytes
+  /// (`pinot.query.runner.grpc.flow.control.window.bytes`):
+  ///  * `65535`    (~64 KiB) — historical gRPC default; gate engages 
frequently.
+  ///  * `1048576`  (~1 MiB)  — typical BDP-estimated LAN value.
+  ///  * `67108864` (64 MiB)  — Pinot's new default; gate rarely engages under 
typical loads.
+  @Param({"65535", "1048576", "67108864"})
+  public int _flowControlWindowBytes;
+
+  private MailboxService _senderService;
+  private MailboxService _receiverService;
+  private SendingMailbox _sender;
+  private ReceivingMailbox _receiver;
+  private List<RowHeapDataBlock> _blocks;
+
+  // Drainer signalling.
+  private final Semaphore _readSignal = new Semaphore(0);
+  private final AtomicBoolean _stop = new AtomicBoolean();
+  private Thread _drainer;
+  private volatile CountDownLatch _drainedLatch;
+
+  @Setup
+  public void setup()
+      throws IOException {
+    PinotConfiguration cfg = new PinotConfiguration(Map.of(
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, 
_backpressureEnabled,
+        
CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, 
_flowControlWindowBytes));

Review Comment:
   Fixed in `6116e5442d`. Same `Math.min(window, default)` clamp you proposed, 
applied to the bench's `PinotConfiguration` map.
   
   You're right that I should have caught this when I added the validation — 
`GrpcSenderBackpressureTightGateTest` (which I also wrote) was already using 
exactly this clamp pattern, but I missed updating the bench in the same change. 
The bench's `_flowControlWindowBytes` axis is back to running all three cells 
(64 KiB / 1 MiB / 64 MiB).
   
   Small correction to the description: the post-rewrite bench has 3 axes 
(`_blockSizeBytes` × 3, `_backpressureEnabled` × 2, `_flowControlWindowBytes` × 
3 = 18 cells, not 24 — the old `_payloadBytes` × 4 axis was dropped when the 
bench moved to a request-shape probe in `a597d63a64`). But the conclusion was 
right — 12 of 18 cells were blowing up at `@Setup`.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/mailbox/GrpcSendingMailbox.java:
##########
@@ -125,7 +169,7 @@ private boolean sendInternal(MseBlock block, 
List<DataBuffer> serializedStats) {
       _contentObserver = getContentObserver();

Review Comment:
   Fixed in `04eb98577c`. The race was real — I landed a regression test 
(`concurrentSendAndCancelInitializeContentObserverExactlyOnce`) that reproduces 
it: sender thread + cancel thread driven through `CyclicBarrier(2)`, asserts 
exactly one observer is created across 200 iterations; fires immediately on 
iteration 0 against unfixed code (counter sees 2), passes deterministically 
with the fix.
   
   Fix uses the standard double-checked-lock idiom against the existing 
`_readyLock`:
   
   ```java
   private void ensureContentObserverInitialized() {
     if (_contentObserver != null) {
       return;                          // fast path, volatile read
     }
     _readyLock.lock();
     try {
       if (_contentObserver == null) {
         _contentObserver = getContentObserver();
       }
     } finally {
       _readyLock.unlock();
     }
   }
   ```
   
   Both bare `if (_contentObserver == null)` blocks (in `sendInternal` and 
`cancel`) now route through this helper. Kept the lazy init rather than eagerly 
initializing in the constructor — `cancel`-before-first-send must still open a 
stream so the receiver gets an explicit error EOS, so the lazy-init-on-cancel 
comment in the cancel path is load-bearing.
   
   Your thread-safety prompt also pointed the way to a symmetric race we 
surfaced in a pre-push strict review: `send(Eos)` racing `cancel`'s 
`onCompleted` on the half-close edge — both pass their top-of-method 
`isTerminated()` check, cancel wins the lock, calls `onCompleted`, and the EOS 
path's `onNext` / `onCompleted` then throws `IllegalStateException` from gRPC. 
Fixed in `840c5640ea` with a defensive try/catch around the EOS path (matching 
the existing pattern in `cancel`) plus a second regression test that races 
`send(Eos)` and `cancel` and asserts neither throws. Plus a half-open-stream 
leak in `close()` fixed in `aeaacc893d` for completeness. Both worth flagging 
since they're in the same observer-lifecycle area you were looking at.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to