This is an automated email from the ASF dual-hosted git repository.
HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 8115d05b1f6 [Exec](colocate) disable colocate group execution in
bucket shuffle join (#63200)
8115d05b1f6 is described below
commit 8115d05b1f6cf4ab53184bd8e3328aa72f0bcb64
Author: HappenLee <[email protected]>
AuthorDate: Fri May 15 14:21:05 2026 +0800
[Exec](colocate) disable colocate group execution in bucket shuffle join
(#63200)
### What problem does this PR solve?
Issue Number: None
Related PR: #63062
Problem Summary:
Disable colocate-style parallelization for bucket shuffle join
fragments. When exchange nodes exist, `UnassignedScanBucketOlapTableJob`
now falls back to the base degree-of-parallelism logic instead of using
the tablet-based colocate strategy,
which avoids over-parallelizing join fragments. The colocate path is
still kept for pure colocate scans without exchange nodes.
---
.../job/UnassignedScanBucketOlapTableJob.java | 7 ++
.../job/UnassignedScanBucketOlapTableJobTest.java | 117 +++++++++++++++++++++
2 files changed, 124 insertions(+)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index 6a2315966bc..b6a450c93a1 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -523,6 +523,13 @@ public class UnassignedScanBucketOlapTableJob extends
AbstractUnassignedScanJob
}
}
+ // The tablet-based parallelism strategy only applies when the
fragment has no exchange nodes
+ // (e.g. pure colocate scan). When exchange nodes are present (e.g.
bucket shuffle join),
+ // fall back to the base class behavior to avoid over-parallelizing
the join fragment.
+ if (!exchangeToChildJob.isEmpty()) {
+ return super.degreeOfParallelism(maxParallel,
useLocalShuffleToAddParallel);
+ }
+
long tabletNum = 0;
for (ScanNode scanNode : scanNodes) {
if (scanNode instanceof OlapScanNode) {
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java
new file mode 100644
index 00000000000..db7f55fb2fd
--- /dev/null
+++
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJobTest.java
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.distribute.worker.job;
+
+import org.apache.doris.nereids.StatementContext;
+import
org.apache.doris.nereids.trees.plans.distribute.worker.ScanWorkerSelector;
+import org.apache.doris.planner.DataPartition;
+import org.apache.doris.planner.ExchangeNode;
+import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.planner.PlanFragment;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.OriginStatement;
+import org.apache.doris.thrift.TUniqueId;
+
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableList;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.BitSet;
+
+public class UnassignedScanBucketOlapTableJobTest {
+
+ @Test
+ public void testDegreeOfParallelismWithExchangeNodes() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ connectContext.setQueryId(new TUniqueId(1, 1));
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ connectContext.getSessionVariable().colocateMaxParallelNum = 128;
+ StatementContext statementContext = new StatementContext(
+ connectContext, new OriginStatement("select * from t", 0));
+ connectContext.setStatementContext(statementContext);
+
+ OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+ Mockito.when(olapScanNode.getTotalTabletsNum()).thenReturn(100L);
+
+ PlanFragment fragment = Mockito.mock(PlanFragment.class);
+
Mockito.when(fragment.getDataPartition()).thenReturn(DataPartition.RANDOM);
+ Mockito.when(fragment.getParallelExecNum()).thenReturn(5);
+
+ ScanWorkerSelector scanWorkerSelector =
Mockito.mock(ScanWorkerSelector.class);
+
+ // Non-empty exchangeToChildJob simulates bucket shuffle join fragment.
+ ExchangeNode exchangeNode = Mockito.mock(ExchangeNode.class);
+ UnassignedJob mockChild = Mockito.mock(UnassignedJob.class);
+ Mockito.when(mockChild.getAllChildrenTypes()).thenReturn(new BitSet());
+ ArrayListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+ exchangeToChildJob.put(exchangeNode, mockChild);
+
+ UnassignedScanBucketOlapTableJob unassignedJob = new
UnassignedScanBucketOlapTableJob(
+ statementContext,
+ fragment,
+ ImmutableList.of(olapScanNode),
+ exchangeToChildJob,
+ scanWorkerSelector
+ );
+
+ // maxParallel = 3 (buckets), parallelExecNum = 5
+ // Base class: min(maxParallel, max(parallelExecNum, 1)) = min(3, 5) =
3
+ int result = unassignedJob.degreeOfParallelism(3, false);
+ Assertions.assertEquals(3, result);
+ }
+
+ @Test
+ public void testDegreeOfParallelismWithoutExchangeNodes() {
+ ConnectContext connectContext = new ConnectContext();
+ connectContext.setThreadLocalInfo();
+ connectContext.setQueryId(new TUniqueId(2, 2));
+ connectContext.getSessionVariable().parallelPipelineTaskNum = 1;
+ connectContext.getSessionVariable().colocateMaxParallelNum = 128;
+ StatementContext statementContext = new StatementContext(
+ connectContext, new OriginStatement("select * from t", 0));
+ connectContext.setStatementContext(statementContext);
+
+ OlapScanNode olapScanNode = Mockito.mock(OlapScanNode.class);
+ Mockito.when(olapScanNode.getTotalTabletsNum()).thenReturn(100L);
+
Mockito.when(olapScanNode.shouldUseOneInstance(Mockito.any())).thenReturn(false);
+
+ PlanFragment fragment = Mockito.mock(PlanFragment.class);
+
Mockito.when(fragment.getDataPartition()).thenReturn(DataPartition.RANDOM);
+ Mockito.when(fragment.getParallelExecNum()).thenReturn(5);
+
+ ScanWorkerSelector scanWorkerSelector =
Mockito.mock(ScanWorkerSelector.class);
+
+ // Empty exchangeToChildJob simulates pure colocate scan (no exchange
nodes).
+ ArrayListMultimap<ExchangeNode, UnassignedJob> exchangeToChildJob =
ArrayListMultimap.create();
+
+ UnassignedScanBucketOlapTableJob unassignedJob = new
UnassignedScanBucketOlapTableJob(
+ statementContext,
+ fragment,
+ ImmutableList.of(olapScanNode),
+ exchangeToChildJob,
+ scanWorkerSelector
+ );
+
+ // Tablet strategy: min(max(tabletNum=100, parallelExecNum=5),
colocateMaxParallelNum=128) = 100
+ int result = unassignedJob.degreeOfParallelism(3, false);
+ Assertions.assertEquals(100, result);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]