dang-stripe commented on code in PR #18538:
URL: https://github.com/apache/pinot/pull/18538#discussion_r3282836545
##########
pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java:
##########
@@ -642,35 +646,51 @@ private BrokerResponse
query(QueryEnvironment.CompiledQuery query, long requestI
return new BrokerResponseNative(QueryErrorCode.EXECUTION_TIMEOUT);
}
- int stageCount = dispatchableSubPlan.getQueryStageMap().size();
- int opChainCount = dispatchableSubPlan.getQueryStageMap().values().stream()
- .mapToInt(stage -> stage.getWorkerMetadataList().size())
- .sum();
-
try {
String clientRequestId =
extractClientRequestId(query.getSqlNodeAndOptions());
onQueryStart(requestId, clientRequestId, query.getTextQuery());
long executionStartTimeNs = System.nanoTime();
- _stagesStartedMeter.mark(stageCount);
- _opchainsStartedMeter.mark(opChainCount);
-
+ // All leaf stages empty means every segment was pruned (or the table
has no data) and no
+ // server dispatch is needed. The paths differ because the normal path
dispatches to servers,
+ // tracks stage/opchain meters, and must re-throw QueryException from
server responses.
QueryDispatcher.QueryResult queryResults;
- try {
- queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, timer.getRemainingTimeMs(),
- query.getOptions());
- } catch (QueryException e) {
- throw e;
- } catch (Throwable t) {
- QueryErrorCode queryErrorCode = QueryErrorCode.QUERY_EXECUTION;
- String consolidatedMessage =
ExceptionUtils.consolidateExceptionTraces(t);
- LOGGER.error("Caught exception executing request {}: {}, {}",
requestId, query, consolidatedMessage);
- requestContext.setErrorCode(queryErrorCode);
- return new BrokerResponseNative(queryErrorCode, consolidatedMessage);
- } finally {
- _stagesFinishedMeter.mark(stageCount);
- _opchainsCompletedMeter.mark(opChainCount);
- onQueryFinish(requestId);
+ if (allLeafStagesEmpty) {
+ try {
+ queryResults = QueryDispatcher.runReducer(dispatchableSubPlan,
query.getOptions(), _mailboxService);
+ } catch (Throwable t) {
Review Comment:
good catch, fixed
##########
pinot-query-planner/src/main/java/org/apache/pinot/query/planner/physical/PinotDispatchPlanner.java:
##########
@@ -120,11 +128,91 @@ private void runValidations(PlanFragment planFragment,
DispatchablePlanContext c
private static DispatchableSubPlan finalizeDispatchableSubPlan(PlanFragment
subPlanRoot,
DispatchablePlanContext dispatchablePlanContext) {
+ // TODO: Physical Optimizer path does not track empty leaf stages. To
support short-circuit,
+ // check if all leaf TableScanMetadata have empty workerIdToSegmentsMap.
+ boolean allLeafStagesEmpty =
dispatchablePlanContext.isAllNonReplicatedLeafStagesEmpty();
+ if (allLeafStagesEmpty &&
hasNonEmptyReplicatedLeaf(dispatchablePlanContext.getDispatchablePlanMetadataMap()))
{
+ allLeafStagesEmpty = false;
+ }
+ Map<Integer, DispatchablePlanFragment> fragmentMap =
+
dispatchablePlanContext.constructDispatchablePlanFragmentMap(subPlanRoot);
+ if (allLeafStagesEmpty) {
+ rewriteReduceStageForEmptyLeaves(fragmentMap);
Review Comment:
added
##########
pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiStageEngineIntegrationTest.java:
##########
@@ -219,6 +219,104 @@ public void testSingleValueQuery()
assertEquals(joinResult, expectedResult);
}
+ @Test
+ public void testAllLeafStagesEmptyBrokerResponses()
Review Comment:
👍 added the tests
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]