adarshsanjeev commented on code in PR #14994:
URL: https://github.com/apache/druid/pull/14994#discussion_r1355075260
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1772,27 +1767,32 @@ private static QueryDefinition makeQueryDefinition(
return queryDef;
} else if (querySpec.getDestination() instanceof
DurableStorageMSQDestination) {
- // attaching new query results stage always.
+ // attaching new query results stage if the final stage does sort during
shuffle so that results are ordered.
StageDefinition finalShuffleStageDef =
queryDef.getFinalStageDefinition();
- final QueryDefinitionBuilder builder = QueryDefinition.builder();
- for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
- builder.add(StageDefinition.builder(stageDef));
- }
-
- builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
- .inputs(new
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
-
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
-
.signature(finalShuffleStageDef.getSignature())
- .shuffleSpec(null)
- .processorFactory(new
QueryResultFrameProcessorFactory())
- );
+ if (finalShuffleStageDef.doesSortDuringShuffle()) {
+ final QueryDefinitionBuilder builder = QueryDefinition.builder();
+ for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
Review Comment:
nit: could use
org.apache.druid.msq.kernel.QueryDefinitionBuilder#addAll(org.apache.druid.msq.kernel.QueryDefinition)
here
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1693,8 +1687,9 @@ private static QueryDefinition makeQueryDefinition(
shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
queryToPlan = querySpec.getQuery();
} else if (querySpec.getDestination() instanceof
DurableStorageMSQDestination) {
- // we add a final stage which generates one partition per worker.
- shuffleSpecFactory =
ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());
+ shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
+ MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())
Review Comment:
I might be missing something in somewhere else in this PR, but doesn't
GlobalSortTargetSizeShuffleSpec enforce the limit on the total partition size
summed across all workers? Since we create a new page for each worker parition
combination, would the limit be enforced?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]