yashmayya commented on code in PR #18538:
URL: https://github.com/apache/pinot/pull/18538#discussion_r3277830092
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -219,6 +219,104 @@ public void testSingleValueQuery()
assertEquals(joinResult, expectedResult);
}
+ @Test
+ public void testAllLeafStagesEmptyBrokerResponses()
Review Comment:
Coverage here is solid for COUNT/SUM/GROUP BY/HAVING, but the cases most
likely to silently regress on empty input aren't exercised: MIN/MAX (without
null-handling these default to +/-`POSITIVE_INFINITY` — a real wrong-result
risk vs. the dispatch path which returns the same sentinel from servers, so
worth pinning), AVG (returns null today), `HAVING SUM(...) IS NULL`, and the
multi-aggregate row-alignment case (e.g. `SELECT MIN(x), MAX(x), AVG(x),
COUNT(*) FROM ...`). The PR description also mentions window functions over
empty input working — would be good to add at least one `OVER ()` case here so
that's pinned. Not a blocker, but cheap to add now.
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -642,35 +646,51 @@ private BrokerResponse
query(QueryEnvironment.CompiledQuery query, long requestI
return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT);
}
- int stageCount = dispatchableSubPlan.getQueryStageMap().size();
- int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
- .mapToInt(stage -> stage.getWorkerMetadataList().size())
- .sum();
-
try {
String clientRequestId =
extractClientRequestId(query.getSqlNodeAndOptions());
onQueryStart(requestId, clientRequestId, query.getTextQuery());
long executionStartTimeNs = System.nanoTime();
- _stagesStartedMeter.mark(stageCount);
- _opchainsStartedMeter.mark(opChainCount);
-
+ // All leaf stages empty means every segment was pruned (or the table
has no data) and no
+ // server dispatch is needed. The paths differ because the normal path
dispatches to servers,
+ // tracks stage/opchain meters, and must re-throw QueryException from
server responses.
QueryDispatcher.QueryResult queryResults;
- try {
- queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, timer.getRemainingTimeMs(),
- query.getOptions());
- } catch (QueryException e) {
- throw e;
- } catch (Throwable t) {
- QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
- String consolidatedMessage =
ExceptionUtils.consolidateExceptionTraces(t);
- LOGGER.error("Caught exception executing request {}: {}, {}",
requestId, query, consolidatedMessage);
- requestContext.setErrorCode(queryErrorCode);
- return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
- } finally {
- _stagesFinishedMeter.mark(stageCount);
- _opchainsCompletedMeter.mark(opChainCount);
- onQueryFinish(requestId);
+ if (allLeafStagesEmpty) {
+ try {
+ queryResults = QueryDispatcher.runReducer(dispatchableSubPlan,
query.getOptions(), _mailboxService);
+ } catch (Throwable t) {
Review Comment:
The dispatch branch above re-throws `QueryException` so typed errors (auth,
validation, etc.) propagate with their original error codes, but this branch
only has `catch (Throwable t)` and wraps everything into `QUERY_EXECUTION`.
`runReducer` can surface typed `QueryException`s too (e.g. from operator
initialization). Suggest adding `catch (QueryException e) { throw e; }` before
the `Throwable` arm to mirror the dispatch path.
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java:
##########
@@ -120,11 +128,91 @@ private void runValidations(PlanFragment planFragment,
DispatchablePlanContext c
private static DispatchableSubPlan finalizeDispatchableSubPlan(PlanFragment
subPlanRoot,
DispatchablePlanContext dispatchablePlanContext) {
+ // TODO: Physical Optimizer path does not track empty leaf stages. To
support short-circuit,
+ // check if all leaf TableScanMetadata have empty workerIdToSegmentsMap.
+ boolean allLeafStagesEmpty =
dispatchablePlanContext.isAllNonReplicatedLeafStagesEmpty();
+ if (allLeafStagesEmpty &&
hasNonEmptyReplicatedLeaf(dispatchablePlanContext.getDispatchablePlanMetadataMap()))
{
+ allLeafStagesEmpty = false;
+ }
+ Map<Integer, DispatchablePlanFragment> fragmentMap =
+
dispatchablePlanContext.constructDispatchablePlanFragmentMap(subPlanRoot);
+ if (allLeafStagesEmpty) {
+ rewriteReduceStageForEmptyLeaves(fragmentMap);
Review Comment:
After this rewrite stage 0 no longer references any `MailboxReceiveNode`,
but the original leaf/intermediate fragments are still left in `fragmentMap`.
Downstream consumers walking the map (`PhysicalExplainPlanVisitor`,
`fillOldBrokerResponseStats`) will then see orphan stages, e.g. `EXPLAIN PLAN
FOR <fully-pruned query>` shows stage 0 as the inlined tree but stages 1+ still
appear in the output. Suggest `fragmentMap.keySet().retainAll(Set.of(0));`
after the rewrite call. As a bonus this also avoids a latent footgun:
`inlineAllLeafStagesEmptyInputs` returns subtrees by reference when no input
changes, so `setStageIdRecursively(inlinedRoot, 0)` can currently mutate
`stageId` on nodes still reachable from the orphan fragments — breaking the
invariant that every node under fragment N has stageId N. Dropping the orphan
stages closes both.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]