timothy-e opened a new pull request, #18791: URL: https://github.com/apache/pinot/pull/18791
https://github.com/apache/pinot/pull/18553 added in-flight request tracking for MSE in the adaptive routing stats. This PR adds per-server latency. It looks very large, but much of it is tests, and some new files were introduced, which comes with a certain amount of boilerplate in Java. **1. The planner marks stages as needing timing**. It assigns `FragmentType`s and then the dispatcher uses those to turn those roles into timing collection in `AdaptiveRoutingStageClassification.java`. it always enables collection on stage 0, and otherwise only on trusted stages that either directly receive from pure leaves or are singleton-leaf stages themselves, while excluding stages fed by non-leaf senders so their timings do not include upstream cascade delay. **2. Stages track their elapsed time and return it as `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat** (`BaseMailboxReceiveOperator`, `BlockingMultiStreamConsumer`, `AdaptiveRoutingUpstreamTimings`) Each `BaseMailboxReceiveOperator` tracks per-sender wall-clock elapsed time (from when its `BlockingMultiStreamConsumer.OfMseBlock` is constructed until each sender's EOS arrives). The start time is captured at consumer construction time, not at query start, to avoid pipeline-breaker inflation: if a pipeline breaker on this stage blocks for 1000ms waiting on a slow sender, fast senders' EOS blocks sit unconsumed in their queues during that time, and measuring from query start would report ~1000ms for all of them. These per-sender timings are encoded as `"hostname|port=elapsedMs;..."` in a new `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat key on `BaseMailboxReceiveOperator.StatKey` and propagated up through the query stats. A serialized string is used because it is more straightforward than adding a Map type to the `StatKey` system. **3. Broker reads leaf-server timings** and resolves `hostname|mailboxPort` sender keys to full instance IDs and accumulates the maximum observed latency per instance across all trusted stages. A `decrementedServers` set prevents double-recording. **4. Query timeout:** on query cancellation (e.g. reaching the timeout), we try to cancel with stats. We preserve completed timings and inject elapsed time for pending senders. ### Testing **1. Performance impact**: We ran some performance tests and saw no impact. **2. Manually with tc** Choose a server that we want to induce latency on. `ssh` to that server and: ``` sudo tc qdisc add dev ens5 root handle 1: prio sudo tc qdisc add dev ens5 parent 1:3 handle 30: netem delay 1200ms sudo tc filter add dev ens5 protocol ip parent 1:0 prio 3 u32 \ match ip dport 8442 0xffff flowid 1:3 ``` We induced latency only for MSE to validate that adaptive routing can be influenced by only MSE queries. To reset the latency ``` sudo tc qdisc del dev ens5 root # cleanup ``` We saw the score spike for the targetted servers. **3. Chaos Agent** We kicked off a chaos agent run that does 1. 90s of 600ms **MSE latency** 2. 30s with no faults 3. 90s of 600ms **SSE latency** 4. 90s with no faults 5. 90s of 1200ms **MSE latency** 6. 30s with no faults 7. 90s of 1200ms **SSE latency** (query timeout is 1s). <img width="1462" height="916" alt="image" src="https://github.com/user-attachments/assets/42fea0fc-b2ed-4070-be6b-9e60f7446a40" /> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
