timothy-e opened a new pull request, #18791:
URL: https://github.com/apache/pinot/pull/18791

   https://github.com/apache/pinot/pull/18553 added in-flight request tracking 
for MSE in the adaptive routing stats. This PR adds per-server latency. It 
looks very large, but much of it is tests, and some new files were introduced, 
which comes with a certain amount of boilerplate in Java.
   
   **1. The planner marks stages as needing timing**. It assigns 
`FragmentType`s and then the dispatcher uses those to turn those roles into 
timing collection in `AdaptiveRoutingStageClassification.java`. it always 
enables collection on stage 0, and otherwise only on trusted stages that either 
directly receive from pure leaves or are singleton-leaf stages themselves, 
while excluding stages fed by non-leaf senders so their timings do not include 
upstream cascade delay.
   
   **2. Stages track their elapsed time and return it as 
`UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat** (`BaseMailboxReceiveOperator`, 
`BlockingMultiStreamConsumer`, `AdaptiveRoutingUpstreamTimings`)
   
   Each `BaseMailboxReceiveOperator` tracks per-sender wall-clock elapsed time 
(from when its `BlockingMultiStreamConsumer.OfMseBlock` is constructed until 
each sender's EOS arrives). The start time is captured at consumer construction 
time, not at query start, to avoid pipeline-breaker inflation: if a pipeline 
breaker on this stage blocks for 1000ms waiting on a slow sender, fast senders' 
EOS blocks sit unconsumed in their queues during that time, and measuring from 
query start would report ~1000ms for all of them.
   
   These per-sender timings are encoded as `"hostname|port=elapsedMs;..."` in a 
new `UPSTREAM_SERVER_RESPONSE_TIMES_MS` stat key on 
`BaseMailboxReceiveOperator.StatKey` and propagated up through the query stats. 
A serialized string is used because it is more straightforward than adding a 
Map type to the `StatKey` system.
   
   **3. Broker reads leaf-server timings** and resolves `hostname|mailboxPort` 
sender keys to full instance IDs and accumulates the maximum observed latency 
per instance across all trusted stages. A `decrementedServers` set prevents 
double-recording.
   
   **4. Query timeout:** on query cancellation (e.g. reaching the timeout), we 
try to cancel with stats. We preserve completed timings and inject elapsed time 
for pending senders. 
   
   ### Testing
   
   **1. Performance impact**:  We ran some performance tests and saw no impact. 
   
   **2. Manually with tc**
   
   Choose a server that we want to induce latency on. `ssh` to that server and:
   
   ```
   sudo tc qdisc add dev ens5 root handle 1: prio
   sudo tc qdisc add dev ens5 parent 1:3 handle 30: netem delay 1200ms
   
   sudo tc filter add dev ens5 protocol ip parent 1:0 prio 3 u32 \
     match ip dport 8442 0xffff flowid 1:3
   ```
   
   We induced latency only for MSE to validate that adaptive routing can be 
influenced by only MSE queries.
   
   To reset the latency
   
   ```
   sudo tc qdisc del dev ens5 root # cleanup
   ```
   
   We saw the score spike for the targetted servers. 
   **3. Chaos Agent**
   We kicked off a chaos agent run that does
   1. 90s of 600ms **MSE latency**
   2. 30s with no faults
   3. 90s of 600ms **SSE latency**
   4. 90s with no faults
   5. 90s of 1200ms **MSE latency**
   6. 30s with no faults
   7. 90s of 1200ms **SSE latency**
   
   (query timeout is 1s). 
   
   <img width="1462" height="916" alt="image" 
src="https://github.com/user-attachments/assets/42fea0fc-b2ed-4070-be6b-9e60f7446a40";
 />


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to