gianm commented on code in PR #18144:
URL: https://github.com/apache/druid/pull/18144#discussion_r2150883862
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/RunWorkOrder.java:
##########
@@ -215,23 +172,11 @@ public void startAsync()
throw new ISE("Cannot start from state[%s]", state);
}
- final StageDefinition stageDef = workOrder.getStageDefinition();
-
try {
exec.registerCancellationId(cancellationId);
- makeInputSliceReader();
- makeWorkOutputChannelFactory();
- makeShuffleOutputChannelFactory();
- makeAndRunWorkProcessors();
-
- if (stageDef.doesShuffle()) {
- makeAndRunShuffleProcessors();
- } else {
- // No shuffling: work output _is_ stage output. Retain read-only
versions to reduce memory footprint.
- stageOutputChannelsFuture =
-
Futures.immediateFuture(workResultAndOutputChannels.getOutputChannels().readOnly());
- }
-
+ initInputSliceReader();
+ initGlobalSortPartitionBoundariesIfNeeded();
Review Comment:
Do you mean the `initGlobalSortPartitionBoundariesIfNeeded()`?
The situation with global sort boundaries is definitely a little weird. The
flow of partitioned global sort is like this:
- Each worker gathers statistics from its input as it is read. While doing
so it buffers the input.
- Each worker sends those statistics to the controller when done reading
input.
- The controller gathers statistics from each worker, uses them to determine
good global partition boundaries, then sends those boundaries to all workers.
(Crucially each worker gets the same set of boundaries)
- The workers partition data according to the boundaries and sorts within
each partition. When each partition is later merged across workers, this yields
a total sort of the data.
This means that some stuff related to global partition boundaries is going
to leak into the `ExecutionContext`:
- there has to be a way for the worker to send its statistics
(`onDoneReadingInput`)
- there has to be a way for the worker to receive the global partition
boundaries (`globalClusterByPartitions`)
Coming back to the `initGlobalSortPartitionBoundariesIfNeeded` you
highlighted: this is for the case where the controller isn't involved, possibly
because there's only one partition. In this case the worker does not send
statistics nor does it receive global boundaries from the controller. It can
know its boundaries on its own. (It will just be "all data goes to partition
0").
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]