yashmayya commented on code in PR #18519: URL: https://github.com/apache/pinot/pull/18519#discussion_r3277728114
########## pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkGrpcMailboxSend.java: ########## @@ -0,0 +1,251 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.perf; + +import java.io.IOException; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.pinot.common.datatable.StatMap; +import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.common.utils.DataSchema.ColumnDataType; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.mailbox.ReceivingMailbox; +import org.apache.pinot.query.mailbox.SendingMailbox; +import org.apache.pinot.query.runtime.blocks.RowHeapDataBlock; +import org.apache.pinot.query.runtime.operator.MailboxSendOperator; +import org.apache.pinot.spi.config.instance.InstanceType; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.query.QueryThreadContext; +import org.apache.pinot.spi.utils.CommonConstants; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.OptionsBuilder; + + +/// Benchmark for the gRPC sender → receiver MSE mailbox path. +/// +/// One @Benchmark invocation sends a fixed total payload (`TOTAL_BYTES = 128 MiB`) split into +/// pre-computed `MseBlock.Data` blocks of size `_blockSizeBytes`, and waits until every block has +/// been polled out of the receiver mailbox. Measured time is therefore end-to-end "ship a request +/// of this size and process it on the other side" — closer to the unit Pinot actually cares about +/// than throughput of one-block-at-a-time hot-loop send. +/// +/// Axes: +/// +/// * `_blockSizeBytes` — three representative block sizes: +/// - `8 KiB` → 16384 blocks per invocation; sender is dominated by per-block overhead. +/// - `8 MiB` → 16 blocks; one block fits in a single gRPC chunk +/// (`maxInboundMessageSize / 2 ≈ 8 MiB`). +/// - `32 MiB` → 4 blocks; each block is split by `toByteStrings` into ~4 gRPC chunks. +/// * `_backpressureEnabled` — toggles the `GrpcSendingMailbox` `isReady()`-gate. `false` reverts +/// to the pre-fix unconditional `onNext` (kill-switch). +/// * `_flowControlWindowBytes` — HTTP/2 per-stream inbound window the receiver advertises +/// (`pinot.query.runner.grpc.flow.control.window.bytes`). Sweeps `{64 KiB, 1 MiB, 64 MiB}`, +/// spanning the historical default, BDP-estimated LAN value, and the new MB-scale default. +/// +/// The drainer runs as a separate thread (modelling the cross-operator boundary in a real query), +/// blocks on a `Semaphore` released by the receiver's `Reader` callback, and counts down a +/// per-invocation `CountDownLatch` for each polled block. The @Benchmark thread sends every block +/// then awaits the latch. +/// +/// Run with `org.openjdk.jmh.Main org.apache.pinot.perf.BenchmarkGrpcMailboxSend`. +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +@Fork(value = 1, jvmArgsAppend = { + "-Xms2g", "-Xmx4g", + "-XX:MaxDirectMemorySize=4g" +}) +@Warmup(iterations = 2, time = 5) +@Measurement(iterations = 3, time = 10) +@State(Scope.Benchmark) +public class BenchmarkGrpcMailboxSend { + + /// Total bytes shipped per @Benchmark invocation. Sized so that even at 32 MiB-per-block we + /// get a handful of blocks to bound the per-invocation latency. + private static final long TOTAL_BYTES = 128L * 1024 * 1024; + + private static final DataSchema SCHEMA = new DataSchema( + new String[]{"payload"}, new ColumnDataType[]{ColumnDataType.STRING}); + + /// Approximate bytes per `MseBlock.Data` payload. Each block carries a single String column, + /// one row, where the String is roughly `_blockSizeBytes` characters (ASCII → 1 byte per char + /// when serialised on the wire, ~2 bytes on heap). The number of blocks per invocation is + /// `ceil(TOTAL_BYTES / _blockSizeBytes)`. + @Param({"8192", "8388608", "33554432"}) + public int _blockSizeBytes; + + /// `true` exercises the sender-side `isReady()`-gate; `false` restores the pre-fix + /// unconditional `onNext` (kill-switch). Wired via + /// `pinot.query.runner.grpc.sender.backpressure.enabled`. + @Param({"true", "false"}) + public boolean _backpressureEnabled; + + /// HTTP/2 per-stream flow-control window the receiver advertises, in bytes + /// (`pinot.query.runner.grpc.flow.control.window.bytes`): + /// * `65535` (~64 KiB) — historical gRPC default; gate engages frequently. + /// * `1048576` (~1 MiB) — typical BDP-estimated LAN value. + /// * `67108864` (64 MiB) — Pinot's new default; gate rarely engages under typical loads. + @Param({"65535", "1048576", "67108864"}) + public int _flowControlWindowBytes; + + private MailboxService _senderService; + private MailboxService _receiverService; + private SendingMailbox _sender; + private ReceivingMailbox _receiver; + private List<RowHeapDataBlock> _blocks; + + // Drainer signalling. + private final Semaphore _readSignal = new Semaphore(0); + private final AtomicBoolean _stop = new AtomicBoolean(); + private Thread _drainer; + private volatile CountDownLatch _drainedLatch; + + @Setup + public void setup() + throws IOException { + PinotConfiguration cfg = new PinotConfiguration(Map.of( + CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, _backpressureEnabled, + CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, _flowControlWindowBytes)); Review Comment: **This benchmark is broken by the new fail-fast validation in `1d29438dc0`.** `_flowControlWindowBytes` is swept over `{65535, 1048576, 67108864}` (line 123) but the `cfg` map here only overrides `KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES`. `maxInboundMessageSize` stays at the 16 MiB default (`DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES`). The new `Preconditions.checkArgument(_flowControlWindowBytes >= maxInboundMessageSize, ...)` at `GrpcMailboxServer.java:163` then rejects two of the three axis values at `MailboxService.start()`: - `65535` < 16 MiB → `IllegalArgumentException` - `1048576` < 16 MiB → `IllegalArgumentException` - `67108864` (64 MiB) ≥ 16 MiB → passes So 16 of the 24 `@Param` configurations (`2 × 4 × 2` = 16 with broken window values, out of `2 × 4 × 3` = 24 total) blow up in `@Setup` instead of running. The whole point of the benchmark — comparing the gate's behavior across the three flow-control-window regimes — collapses to a single regime. Fix is one line — mirror what `GrpcSenderBackpressureTightGateTest.java:118` already does: ```java PinotConfiguration cfg = new PinotConfiguration(Map.of( CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_SENDER_BACKPRESSURE_ENABLED, _backpressureEnabled, CommonConstants.MultiStageQueryRunner.KEY_OF_GRPC_FLOW_CONTROL_WINDOW_BYTES, _flowControlWindowBytes, CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES, Math.min(_flowControlWindowBytes, CommonConstants.MultiStageQueryRunner.DEFAULT_MAX_INBOUND_QUERY_DATA_BLOCK_SIZE_BYTES))); ``` (The `Math.min` keeps the 64 MiB axis using the production-default 16 MiB max-message-size, which is what an operator would actually run; the two smaller axes pin `maxInboundMessageSize` down to the window so the validation passes.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
