gortiz opened a new pull request, #18458:
URL: https://github.com/apache/pinot/pull/18458

   ## Summary
   
   Implements the proposal from #18375: a new opt-in mode where servers push 
per-opchain stats directly to the broker over a long-lived bidi gRPC stream, 
instead of piggy-backing them onto the mailbox just before EOS.
   
   Resolves #18375.
   
   ### What the issue proposed vs. what is implemented
   
   The implementation follows the issue's proposal closely, with a few 
clarifications:
   
   | Area | Issue proposal | This PR |
   |---|---|---|
   | New RPC | `SubmitWithStream(stream BrokerToServer) returns (stream 
ServerToBroker)` | ✅ Implemented as specified |
   | `BrokerToServer` | `submit` + `cancel` payloads | ✅ Both implemented; 
`cancel` routes to `QueryRunner.cancel()` and also fires on stream-close |
   | `ServerToBroker` | `submit_ack` + `OpChainComplete` + `ServerDone` | ✅ 
Implemented as specified |
   | `MultiStageStatsTree` / `StageStatsNode` | Structured tree-shaped payload 
with operator type, plan-node ids, stat bytes, children | ✅ Implemented; 
encoder walks the live operator tree + flat `MultiStageQueryStats` lists; 
decoder produces a `StageStatsTreeNode` accumulator on the broker |
   | Op→PlanNode mapping | Captured via existing `BiConsumer` tracker in 
`PlanNodeToOpChain`; leaf sub-tree walked | ✅ Implemented on 
`OpChainExecutionContext` |
   | Stats in mailbox | Suppressed when stream mode is active | ✅ Server sends 
EOS without stats when `SubmitWithStream` is in use |
   | Broker accumulator | Per-stage `Map<Integer, StageStatsNode>` with 
`StatMap.merge` per node; tree-shape mismatch → `mergeFailed` | ✅ 
`StreamingQuerySession` owns the accumulator and per-stage coverage counters |
   | Wait window | After data-mailbox EOS, broker waits up to `min(50 ms, 
remaining timeout)` for outstanding `OpChainComplete`s | ✅ Implemented in 
`submitAndReduceWithStream` |
   | Fan-out cancel | On first peer error, broker sends `BrokerToServer.cancel` 
on all open streams | ✅ `StreamingQuerySession.fanOutCancel()` |
   | Per-stage coverage | `responded` / `mergeFailed` / `missing` exposed in 
broker response as `streamStatsCoverage[]` | ✅ Exposed on `QueryResult` and 
broker JSON response |
   | Config | Cluster-level 
`pinot.broker.multistage.use.stream.stats.reporting` + per-query 
`useStreamStatsReporting` option | ✅ Implemented; default is `false` (legacy 
mode unchanged) |
   | Fallback | Broker detects `UNIMPLEMENTED` and falls back to legacy for 
that query | ✅ Implemented in `DispatchClient` |
   
   **Notable divergences / implementation decisions not in the issue:**
   
   - **`ServerDone` is implicit, not a separate message.** The server 
half-closes the outbound stream (calls `onCompleted()`) after the last 
`OpChainComplete` rather than sending a dedicated `ServerDone` message. This is 
equivalent and avoids an extra round-trip.
   - **Cancel via stream replaces the unary `Cancel` RPC for stream-mode 
queries (Phase B).** The separate `Cancel` RPC is kept alive for one release 
for backward compatibility but its response no longer carries stats.
   - **O(1) cancel in `OpChainSchedulerService` (all modes).** The previous 
`cancel()` scanned `_opChainCache` O(n). A new `_executionContextByRequest` map 
+ `_activeOpChainsByRequest` reference-counter enable direct 
`QueryExecutionContext.terminate()` without a scan. The write lock is acquired 
*before* the `compute()` eviction to close the race window between context 
removal and cancelled-query cache write.
   - **`tryRecoverWithStream`** fans out cancel and then waits for remaining 
`OpChainComplete` stats up to the query deadline, so error-path stats are as 
complete as possible.
   - **N-ary set-op tree reconstruction is now correct.** The legacy 
`InStageStatsTreeBuilder` heuristic (implicit arity) was lossy for set ops with 
more than two inputs. The new `MultiStageStatsTree` format carries the tree 
shape explicitly, so the broker reconstructs it exactly regardless of arity.
   
   ### Key files
   
   | File | Role |
   |---|---|
   | `pinot-common/src/main/proto/worker.proto` | New RPC + `BrokerToServer` / 
`ServerToBroker` / `OpChainComplete` / `MultiStageStatsTree` / `StageStatsNode` 
messages |
   | `QueryServer.java` | `SubmitWithStream` handler: plan submission, opchain 
completion callbacks, cancel-via-stream, stream-close-as-cancel |
   | `MultiStageStatsTreeEncoder.java` | Server-side: walks live operator tree 
+ `MultiStageQueryStats` flat lists → `MultiStageStatsTree` proto |
   | `MultiStageStatsTreeDecoder.java` | Broker-side: decodes proto → 
`StageStatsTreeNode` accumulator; detects shape mismatches |
   | `StreamingQuerySession.java` | Per-query accumulator: per-stage tree 
merge, coverage counters, completion latch, fan-out cancel |
   | `QueryDispatcher.java` | Opens `SubmitWithStream` streams; wait window; 
error recovery; stats coverage in response |
   | `OpChainSchedulerService.java` | O(1) cancel; completion listener 
registration for stream-mode stats push |
   | `OpChainExecutionContext.java` | Op→PlanNode map captured at construction 
time |
   | `StreamStatsReportingIntegrationTest.java` | Integration tests: simple 
aggregation, join (≥3 stages), three-way UNION (N-ary set op regression), 
cluster-level config activation |
   
   ### Test plan
   
   - [x] `StreamStatsReportingIntegrationTest` — 4 tests covering simple 
aggregation, join, three-way UNION (N-ary set-op regression), cluster-level 
config; all pass
   - [x] `MultiStageStatsTreeEncoderTest` / `MultiStageStatsTreeDecoderTest` — 
round-trip, merge, shape-mismatch handling
   - [x] `StreamingQuerySessionTest` — coverage counters, fan-out cancel, latch 
semantics
   - [x] `OpChainSchedulerServiceTest` — O(1) cancel, multi-opchain context 
cleanup, completion listener lifecycle
   - [x] `DispatchClientTest` / `StreamingDispatchObserverTest` — 
`SubmitWithStream` client-side observer, `UNIMPLEMENTED` fallback
   - [x] Legacy mode unchanged: all existing MSE integration tests unaffected 
(stream mode is opt-in, default off)
   
   🤖 Generated with [Claude Code](https://claude.com/claude-code)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to