This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 6275277ba73 MSQ: Use full parallelism in localSort. (#18765)
6275277ba73 is described below
commit 6275277ba731443f2324ef2f0ae57b47b475a685
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Dec 7 21:05:26 2025 -0800
MSQ: Use full parallelism in localSort. (#18765)
Since localSort was introduced for sort-merge join in #13506, it has
used 1 processor at a time with a max of 2 channels per processor.
This is inefficient, because at the time the sorter runs, it is the
only sorter running. The patch contains a comment describing the
situation in more detail.
This patch has two benefits. First, the sorter should run faster
if multiple processing threads are available. Second, the sorter,
due to having a larger max channels per merger, will make more
efficient use of intermediate channels.
---
.../druid/msq/exec/std/StandardShuffleOperations.java | 15 +++++++++++++--
1 file changed, 13 insertions(+), 2 deletions(-)
diff --git
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
index d46f30be00e..d3daec7d606 100644
---
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
+++
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
@@ -364,6 +364,7 @@ public class StandardShuffleOperations
nextFuture = Futures.transformAsync(
nextFuture,
ignored -> {
+ final WorkerMemoryParameters memoryParameters =
executionContext.frameContext().memoryParameters();
final SuperSorter sorter = new SuperSorter(
Collections.singletonList(channel.getReadableChannel()),
stageDefinition.getFrameReader(),
@@ -382,8 +383,18 @@ public class StandardShuffleOperations
executionContext.makeIntermediateOutputChannelFactory(
StringUtils.format("hash-parts-super-sort-%06d",
channel.getPartitionNumber())),
executionContext.frameContext().frameWriterSpec().getRowBasedFrameType(),
- 1,
- 2,
+
+ // Use full parallelism, since at the time this sorter
runs, it is the only sorter running.
+ //
+ // Typically, nothing else is running at all. Whenever
there is more than one output partition,
+ // the step prior to this localSort (typically
hashPartition) should use buffered output channels
+ // and therefore would have exited.
+ //
+ // In the case where there is one output partition, the
step prior to this localSort may run
+ // concurrently with the sorter, but in that case it
will only have one output channel so won't
+ // have many frames buffered.
+ memoryParameters.getSuperSorterConcurrentProcessors(),
+ memoryParameters.getSuperSorterMaxChannelsPerMerger(),
ShuffleSpec.UNLIMITED,
executionContext.cancellationId(),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]