This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 6275277ba73 MSQ: Use full parallelism in localSort. (#18765)
6275277ba73 is described below

commit 6275277ba731443f2324ef2f0ae57b47b475a685
Author: Gian Merlino <[email protected]>
AuthorDate: Sun Dec 7 21:05:26 2025 -0800

    MSQ: Use full parallelism in localSort. (#18765)
    
    Since localSort was introduced for sort-merge join in #13506, it has
    used 1 processor at a time with a max of 2 channels per processor.
    This is inefficient, because at the time the sorter runs, it is the
    only sorter running. The patch contains a comment describing the
    situation in more detail.
    
    This patch has two benefits. First, the sorter should run faster
    if multiple processing threads are available. Second, the sorter,
    due to having a larger max channels per merger, will make more
    efficient use of intermediate channels.
---
 .../druid/msq/exec/std/StandardShuffleOperations.java     | 15 +++++++++++++--
 1 file changed, 13 insertions(+), 2 deletions(-)

diff --git 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
index d46f30be00e..d3daec7d606 100644
--- 
a/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
+++ 
b/multi-stage-query/src/main/java/org/apache/druid/msq/exec/std/StandardShuffleOperations.java
@@ -364,6 +364,7 @@ public class StandardShuffleOperations
             nextFuture = Futures.transformAsync(
                 nextFuture,
                 ignored -> {
+                  final WorkerMemoryParameters memoryParameters = 
executionContext.frameContext().memoryParameters();
                   final SuperSorter sorter = new SuperSorter(
                       Collections.singletonList(channel.getReadableChannel()),
                       stageDefinition.getFrameReader(),
@@ -382,8 +383,18 @@ public class StandardShuffleOperations
                       executionContext.makeIntermediateOutputChannelFactory(
                           StringUtils.format("hash-parts-super-sort-%06d", 
channel.getPartitionNumber())),
                       
executionContext.frameContext().frameWriterSpec().getRowBasedFrameType(),
-                      1,
-                      2,
+
+                      // Use full parallelism, since at the time this sorter 
runs, it is the only sorter running.
+                      //
+                      // Typically, nothing else is running at all. Whenever 
there is more than one output partition,
+                      // the step prior to this localSort (typically 
hashPartition) should use buffered output channels
+                      // and therefore would have exited.
+                      //
+                      // In the case where there is one output partition, the 
step prior to this localSort may run
+                      // concurrently with the sorter, but in that case it 
will only have one output channel so won't
+                      // have many frames buffered.
+                      memoryParameters.getSuperSorterConcurrentProcessors(),
+                      memoryParameters.getSuperSorterMaxChannelsPerMerger(),
                       ShuffleSpec.UNLIMITED,
                       executionContext.cancellationId(),
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to