gortiz commented on code in PR #18458:
URL: https://github.com/apache/pinot/pull/18458#discussion_r3273981613


##########
pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java:
##########
@@ -189,10 +191,189 @@ public QueryResult submitAndReduce(RequestContext 
context, DispatchableSubPlan d
     }
   }
 
-  /// Tries to recover from an exception thrown during query dispatching.
+  /// Streaming variant of {@link #submitAndReduce}: opens one {@code 
SubmitWithStream} bidi RPC per server, runs the
+  /// broker's stage 0 reducer, and once the receiving mailbox finishes awaits 
the per-stage stats with early
+  /// completion (returns as soon as every expected opchain has reported, or 
when the wait window fires — whichever
+  /// happens first). Stats from the session accumulator are then merged into 
the broker's local stage 0 stats to
+  /// build the final {@link QueryResult}.
+  ///
+  /// The wait window is bounded by the query's remaining timeout: if {@code 
submitWithStream + runReducer} consumed
+  /// most of the budget, the per-stage stats may end up partial (visible via 
the per-stage {@code mergeFailed} /
+  /// {@code missing} counts the session exposes).
+  ///
+  /// Cancel is handled via {@link StreamingQuerySession#fanOutCancel()} — no 
unary Cancel RPCs are issued for this
+  /// query path. On any error, fan-out cancel is broadcast over the open 
streams, then the broker waits for remaining
+  /// stats before building the final result.
+  ///
+  /// <b>Mixed-version policy.</b> No automatic fallback to the unary {@link 
#submit} path. Enabling
+  /// {@link CommonConstants.Broker.Request.QueryOptionKey#STREAM_STATS} 
requires every server in the
+  /// cluster to implement {@code SubmitWithStream}; if any server returns 
{@code UNIMPLEMENTED} or any other
+  /// transport error during dispatch, {@link #submitWithStream} surfaces the 
throwable through the ack queue,
+  /// {@link #processResults} throws, and this method fans out cancel via the 
session before propagating the failure.
+  private QueryResult submitAndReduceWithStream(RequestContext context, 
DispatchableSubPlan dispatchableSubPlan,
+      long timeoutMs, Map<String, String> queryOptions)
+      throws Exception {
+    long requestId = context.getRequestId();
+    long deadlineMs = System.currentTimeMillis() + timeoutMs;
+    Set<QueryServerInstance> servers = new HashSet<>();
+
+    // The session's expected-opchain count must equal the total number of 
opchains across every (server, non-root
+    // stage) pair — that's how many OpChainComplete messages we expect to 
receive.
+    Set<DispatchablePlanFragment> stagePlansWithoutRoot = 
dispatchableSubPlan.getQueryStagesWithoutRoot();
+    int totalExpected = 0;
+    Map<Integer, Integer> expectedByStage = new HashMap<>();
+    for (DispatchablePlanFragment stagePlan : stagePlansWithoutRoot) {
+      int stageId = stagePlan.getPlanFragment().getFragmentId();
+      int stageCount = 0;
+      for (List<Integer> workerIds : 
stagePlan.getServerInstanceToWorkerIdMap().values()) {
+        stageCount += workerIds.size();
+      }
+      totalExpected += stageCount;
+      expectedByStage.put(stageId, stageCount);
+    }
+    StreamingQuerySession session = new StreamingQuerySession(requestId, 
totalExpected);
+
+    try {
+      submitWithStream(requestId, dispatchableSubPlan, timeoutMs, servers, 
queryOptions, session);
+      QueryResult brokerResult = runReducer(dispatchableSubPlan, queryOptions, 
_mailboxService);
+
+      // Receiving mailbox finished. Wait for stats: returns true as soon as 
every opchain has reported, or false
+      // when the timeout fires.
+      long remainingMs = Math.max(0, deadlineMs - System.currentTimeMillis());
+      boolean fullCoverage = session.awaitCompletion(remainingMs, 
TimeUnit.MILLISECONDS);

Review Comment:
   The code now has STATS_DRAIN_ON_SUCCESS_MS = 50L and uses 
Math.min(STATS_DRAIN_ON_SUCCESS_MS, remainingMs)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to