timothy-e commented on code in PR #18553:
URL: https://github.com/apache/pinot/pull/18553#discussion_r3319915693
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -167,10 +168,40 @@ public void start() {
public QueryResult submitAndReduce(RequestContext context,
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
Map<String, String> queryOptions)
throws Exception {
+ return submitAndReduce(context, dispatchableSubPlan, timeoutMs,
queryOptions, null);
+ }
+
+ /// Same as {@link #submitAndReduce(RequestContext, DispatchableSubPlan,
long, Map)} but records per-server
+ /// in-flight request statistics into {@code statsManager} for use by the
adaptive query router.
+ /// When {@code statsManager} is non-null:
+ /// <ul>
+ /// <li>Each leaf server is registered as having one more in-flight
request via
+ /// {@link ServerRoutingStatsManager#recordStatsForQuerySubmission}
after the fan-out begins.</li>
+ /// <li>After the full fan-out completes (or fails), each server is
decremented via
+ /// {@link ServerRoutingStatsManager#recordStatsUponResponseArrival}
with {@code latency = -1}
+ /// (no latency is recorded at this stage).</li>
+ /// </ul>
+ /// TODO: Replace the coarse end-of-fanout decrement with per-sender arrival
once per-sender EOS
+ /// interception is in place, and record real leaf-stage latency at
that point.
+ public QueryResult submitAndReduce(RequestContext context,
DispatchableSubPlan dispatchableSubPlan, long timeoutMs,
+ Map<String, String> queryOptions, @Nullable ServerRoutingStatsManager
statsManager)
+ throws Exception {
long requestId = context.getRequestId();
Set<QueryServerInstance> servers = new HashSet<>();
+ // Tracks servers where recordStatsForQuerySubmission was actually called,
so the finally block only
+ // decrements servers that were incremented — guarding against a partial
failure in submit().
+ Set<QueryServerInstance> incrementedServers = new HashSet<>();
try {
submit(requestId, dispatchableSubPlan, timeoutMs, servers, queryOptions);
+ // The SSE engine increments before `submit`, but here we increment
after because `submit` populates
+ // the list of servers. Getting the list of servers before calling
`submit` would expose
+ // implementation details of `submit`.
+ if (statsManager != null) {
+ for (QueryServerInstance server : servers) {
+ statsManager.recordStatsForQuerySubmission(requestId,
server.getInstanceId());
Review Comment:
I don't think this is true. `submt` does an async dispatch. we mark the
servers as incremented very shortly after submitting. `runReducer` has to fetch
the query results, which will generally be longer than SSE queries. This seems
to me to be the same severity as the SSE adaptive routing, and it doesn't seem
to have caused any issues yet.
In practice for this one, we haven't seen any negative inflight requests on
the ~10 Stripe clusters these stats are deployed to, over the last month.
##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -183,6 +214,11 @@ public QueryResult submitAndReduce(RequestContext context,
DispatchableSubPlan d
cancel(requestId);
throw e;
} finally {
+ if (statsManager != null) {
+ for (QueryServerInstance server : incrementedServers) {
+ statsManager.recordStatsUponResponseArrival(requestId,
server.getInstanceId(), -1);
Review Comment:
Since this is a WIP feature and likely no one will immediately start usng
the MSE latency stats, are you okay with leaving this as a TODO until the next
PR merges? I have it ready internally and the stats are working well for Stripe
so far, I just can't stack PRs on another repo.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]