This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8115d05b1f6 [Exec](colocate) disable colocate group execution in 
bucket shuffle join (#63200)
8115d05b1f6 is described below

commit 8115d05b1f6cf4ab53184bd8e3328aa72f0bcb64
Author: HappenLee <[email protected]>
AuthorDate: Fri May 15 14:21:05 2026 +0800

    [Exec](colocate) disable colocate group execution in bucket shuffle join 
(#63200)
    
    ### What problem does this PR solve?
     Issue Number: None
     Related PR: #63062
     Problem Summary:
    Disable colocate-style parallelization for bucket shuffle join
    fragments. When exchange nodes exist, `UnassignedScanBucketOlapTableJob`
    now falls back to the base degree-of-parallelism logic instead of using
    the tablet-based colocate strategy,
    which avoids over-parallelizing join fragments. The colocate path is
    still kept for pure colocate scans without exchange nodes.
---
 .../job/UnassignedScanBucketOlapTableJob.java      |   7 ++
 .../job/UnassignedScanBucketOlapTableJobTest.java  | 117 +++++++++++++++++++++
 2 files changed, 124 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 6a2315966bc..b6a450c93a1 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -523,6 +523,13 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
             }
         }
 
+        // The tablet-based parallelism strategy only applies when the 
fragment has no exchange nodes
+        // (e.g. pure colocate scan). When exchange nodes are present (e.g. 
bucket shuffle join),
+        // fall back to the base class behavior to avoid over-parallelizing 
the join fragment.
+        if (!exchangeToChildJob.isEmpty()) {
+            return super.degreeOfParallelism(maxParallel, 
useLocalShuffleToAddParallel);
+        }
+
         long tabletNum = 0;
         for (ScanNode scanNode : scanNodes) {
             if (scanNode instanceof OlapScanNode) {
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java
new file mode 100644
index 00000000000..db7f55fb2fd
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.nereids.StatementContext;
+import 
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.BitSet;
+
+public class UnassignedScanBucketOlapTableJobTest {
+
+    @Test
+    public void testDegreeOfParallelismWithExchangeNodes() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(1, 1));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        connectContext.getSessionVariable().colocateMaxParallelNum = 128;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        Mockito.when(olapScanNode.getTotalTabletsNum()).thenReturn(100L);
+
+        PlanFragment fragment = Mockito.mock(PlanFragment.class);
+        
Mockito.when(fragment.getDataPartition()).thenReturn(DataPartition.RANDOM);
+        Mockito.when(fragment.getParallelExecNum()).thenReturn(5);
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+
+        // Non-empty exchangeToChildJob simulates bucket shuffle join fragment.
+        ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+        UnassignedJob mockChild = Mockito.mock(UnassignedJob.class);
+        Mockito.when(mockChild.getAllChildrenTypes()).thenReturn(new BitSet());
+        ArrayListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob = 
ArrayListMultimap.create();
+        exchangeToChildJob.put(exchangeNode, mockChild);
+
+        UnassignedScanBucketOlapTableJob unassignedJob = new 
UnassignedScanBucketOlapTableJob(
+                statementContext,
+                fragment,
+                ImmutableList.of(olapScanNode),
+                exchangeToChildJob,
+                scanWorkerSelector
+        );
+
+        // maxParallel = 3 (buckets), parallelExecNum = 5
+        // Base class: min(maxParallel, max(parallelExecNum, 1)) = min(3, 5) = 
3
+        int result = unassignedJob.degreeOfParallelism(3, false);
+        Assertions.assertEquals(3, result);
+    }
+
+    @Test
+    public void testDegreeOfParallelismWithoutExchangeNodes() {
+        ConnectContext connectContext = new ConnectContext();
+        connectContext.setThreadLocalInfo();
+        connectContext.setQueryId(new TUniqueId(2, 2));
+        connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+        connectContext.getSessionVariable().colocateMaxParallelNum = 128;
+        StatementContext statementContext = new StatementContext(
+                connectContext, new OriginStatement("select * from t", 0));
+        connectContext.setStatementContext(statementContext);
+
+        OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+        Mockito.when(olapScanNode.getTotalTabletsNum()).thenReturn(100L);
+        
Mockito.when(olapScanNode.shouldUseOneInstance(Mockito.any())).thenReturn(false);
+
+        PlanFragment fragment = Mockito.mock(PlanFragment.class);
+        
Mockito.when(fragment.getDataPartition()).thenReturn(DataPartition.RANDOM);
+        Mockito.when(fragment.getParallelExecNum()).thenReturn(5);
+
+        ScanWorkerSelector scanWorkerSelector = 
Mockito.mock(ScanWorkerSelector.class);
+
+        // Empty exchangeToChildJob simulates pure colocate scan (no exchange 
nodes).
+        ArrayListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob = 
ArrayListMultimap.create();
+
+        UnassignedScanBucketOlapTableJob unassignedJob = new 
UnassignedScanBucketOlapTableJob(
+                statementContext,
+                fragment,
+                ImmutableList.of(olapScanNode),
+                exchangeToChildJob,
+                scanWorkerSelector
+        );
+
+        // Tablet strategy: min(max(tabletNum=100, parallelExecNum=5), 
colocateMaxParallelNum=128) = 100
+        int result = unassignedJob.degreeOfParallelism(3, false);
+        Assertions.assertEquals(100, result);
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to