gianm commented on code in PR #18144:
URL: https://github.com/apache/druid/pull/18144#discussion_r2150883862


##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java:
##########
@@ -215,23 +172,11 @@ public void startAsync()
       throw new ISE("Cannot start from state[%s]", state);
     }
 
-    final StageDefinition stageDef = workOrder.getStageDefinition();
-
     try {
       exec.registerCancellationId(cancellationId);
-      makeInputSliceReader();
-      makeWorkOutputChannelFactory();
-      makeShuffleOutputChannelFactory();
-      makeAndRunWorkProcessors();
-
-      if (stageDef.doesShuffle()) {
-        makeAndRunShuffleProcessors();
-      } else {
-        // No shuffling: work output _is_ stage output. Retain read-only 
versions to reduce memory footprint.
-        stageOutputChannelsFuture =
-            
Futures.immediateFuture(workResultAndOutputChannels.getOutputChannels().readOnly());
-      }
-
+      initInputSliceReader();
+      initGlobalSortPartitionBoundariesIfNeeded();

Review Comment:
   Do you mean the `initGlobalSortPartitionBoundariesIfNeeded()`?
   
   The situation with global sort boundaries is definitely a little weird. The 
flow of partitioned global sort is like this:
   
   - Each worker gathers statistics from its input as it is read. While doing 
so it buffers the input.
   - Each worker sends those statistics to the controller when done reading 
input.
   - The controller gathers statistics from each worker, uses them to determine 
good global partition boundaries, then sends those boundaries to all workers. 
(Crucially each worker gets the same set of boundaries)
   - The workers partition data according to the boundaries and sorts within 
each partition. When each partition is later merged across workers, this yields 
a total sort of the data.
   
   This means that some stuff related to global partition boundaries is going 
to leak into the `ExecutionContext`:
   
   - there has to be a way for the worker to send its statistics 
(`onDoneReadingInput`)
   - there has to be a way for the worker to receive the global partition 
boundaries (`globalClusterByPartitions`)
   
   Coming back to the `initGlobalSortPartitionBoundariesIfNeeded` you 
highlighted: this is for the case where the controller isn't involved, possibly 
because there's only one partition. In this case the worker does not send 
statistics nor does it receive global boundaries from the controller. It can 
know its boundaries on its own. (It will just be "all data goes to partition 
0").



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to