adarshsanjeev commented on code in PR #14994:
URL: https://github.com/apache/druid/pull/14994#discussion_r1355075260


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1772,27 +1767,32 @@ private static QueryDefinition makeQueryDefinition(
       return queryDef;
     } else if (querySpec.getDestination() instanceof 
DurableStorageMSQDestination) {
 
-      // attaching new query results stage always.
+      // attaching new query results stage if the final stage does sort during 
shuffle so that results are ordered.
       StageDefinition finalShuffleStageDef = 
queryDef.getFinalStageDefinition();
-      final QueryDefinitionBuilder builder = QueryDefinition.builder();
-      for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {
-        builder.add(StageDefinition.builder(stageDef));
-      }
-
-      builder.add(StageDefinition.builder(queryDef.getNextStageNumber())
-                                 .inputs(new 
StageInputSpec(queryDef.getFinalStageDefinition().getStageNumber()))
-                                 
.maxWorkerCount(tuningConfig.getMaxNumWorkers())
-                                 
.signature(finalShuffleStageDef.getSignature())
-                                 .shuffleSpec(null)
-                                 .processorFactory(new 
QueryResultFrameProcessorFactory())
-      );
+      if (finalShuffleStageDef.doesSortDuringShuffle()) {
+        final QueryDefinitionBuilder builder = QueryDefinition.builder();
+        for (final StageDefinition stageDef : queryDef.getStageDefinitions()) {

Review Comment:
   nit: could use 
org.apache.druid.msq.kernel.QueryDefinitionBuilder#addAll(org.apache.druid.msq.kernel.QueryDefinition)
 here



##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java:
##########
@@ -1693,8 +1687,9 @@ private static QueryDefinition makeQueryDefinition(
       shuffleSpecFactory = ShuffleSpecFactories.singlePartition();
       queryToPlan = querySpec.getQuery();
     } else if (querySpec.getDestination() instanceof 
DurableStorageMSQDestination) {
-      // we add a final stage which generates one partition per worker.
-      shuffleSpecFactory = 
ShuffleSpecFactories.globalSortWithMaxPartitionCount(tuningConfig.getMaxNumWorkers());
+      shuffleSpecFactory = ShuffleSpecFactories.getGlobalSortWithTargetSize(
+          MultiStageQueryContext.getRowsPerPage(querySpec.getQuery().context())

Review Comment:
   I might be missing something in somewhere else in this PR, but doesn't 
GlobalSortTargetSizeShuffleSpec enforce the limit on the total partition size 
summed across all workers? Since we create a new page for each worker parition 
combination, would the limit be enforced?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to