yashmayya commented on code in PR #18553:
URL: https://github.com/apache/pinot/pull/18553#discussion_r3320826336


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -167,10 +168,40 @@ public void start() {
   public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
       Map<String, String> queryOptions)
       throws Exception {
+    return submitAndReduce(context, dispatchableSubPlan, timeoutMs, 
queryOptions, null);
+  }
+
+  /// Same as {@link #submitAndReduce(RequestContext, DispatchableSubPlan, 
long, Map)} but records per-server
+  /// in-flight request statistics into {@code statsManager} for use by the 
adaptive query router.
+  /// When {@code statsManager} is non-null:
+  /// <ul>
+  ///   <li>Each leaf server is registered as having one more in-flight 
request via
+  ///       {@link ServerRoutingStatsManager#recordStatsForQuerySubmission} 
after the fan-out begins.</li>
+  ///   <li>After the full fan-out completes (or fails), each server is 
decremented via
+  ///       {@link ServerRoutingStatsManager#recordStatsUponResponseArrival} 
with {@code latency = -1}
+  ///       (no latency is recorded at this stage).</li>
+  /// </ul>
+  /// TODO: Replace the coarse end-of-fanout decrement with per-sender arrival 
once per-sender EOS
+  ///       interception is in place, and record real leaf-stage latency at 
that point.
+  public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
+      Map<String, String> queryOptions, @Nullable ServerRoutingStatsManager 
statsManager)
+      throws Exception {
     long requestId = context.getRequestId();
     Set<QueryServerInstance> servers = new HashSet<>();
+    // Tracks servers where recordStatsForQuerySubmission was actually called, 
so the finally block only
+    // decrements servers that were incremented — guarding against a partial 
failure in submit().
+    Set<QueryServerInstance> incrementedServers = new HashSet<>();
     try {
       submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
+      // The SSE engine increments before `submit`, but here we increment 
after because `submit` populates
+      // the list of servers. Getting the list of servers before calling 
`submit` would expose
+      // implementation details of `submit`.
+      if (statsManager != null) {
+        for (QueryServerInstance server : servers) {
+          statsManager.recordStatsForQuerySubmission(requestId, 
server.getInstanceId());

Review Comment:
   Fair — `runReducer` is meaningfully longer than the increment window for any 
non-trivial MSE query, and a month of clean production data on ~10 clusters is 
a much stronger signal than my code-reading. Dropping this.



##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -183,6 +214,11 @@ public QueryResult submitAndReduce(RequestContext context, 
DispatchableSubPlan d
       cancel(requestId);
       throw e;
     } finally {
+      if (statsManager != null) {
+        for (QueryServerInstance server : incrementedServers) {
+          statsManager.recordStatsUponResponseArrival(requestId, 
server.getInstanceId(), -1);

Review Comment:
   OK to leave real latency for the follow-up PR. One small ask while it's a 
TODO though: can we just skip exporting `ADAPTIVE_SERVER_MSE_LATENCY_EMA` and 
`ADAPTIVE_SERVER_MSE_HYBRID_SCORE` from `exportStatsForMap` until then? Right 
now they'll publish a flat init value and anyone wiring a dashboard against the 
new gauges will think they're broken. Easy to drop the line, easy to add back 
when latency lands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to