This is an automated email from the ASF dual-hosted git repository.

lihaopeng pushed a commit to branch tpc_preview6
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 2a6ca57964385931088cc5ab8d00ed3c45ac96e0
Author: happenlee <[email protected]>
AuthorDate: Mon Feb 9 20:53:33 2026 +0800

    change colocate execution parallel num
---
 .../job/UnassignedScanBucketOlapTableJob.java      | 36 ++++++++++++++++++++++
 .../java/org/apache/doris/qe/SessionVariable.java  |  4 +++
 2 files changed, 40 insertions(+)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
index a97315d8d80..6a2315966bc 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/distribute/worker/job/UnassignedScanBucketOlapTableJob.java
@@ -505,4 +505,40 @@ public class UnassignedScanBucketOlapTableJob extends 
AbstractUnassignedScanJob
         }
         return workers;
     }
+
+    @Override
+    protected int degreeOfParallelism(int maxParallel, boolean 
useLocalShuffleToAddParallel) {
+        Preconditions.checkArgument(maxParallel > 0, "maxParallel must be 
positive");
+        if (!fragment.getDataPartition().isPartitioned()) {
+            return 1;
+        }
+        if (fragment.queryCacheParam != null) {
+            return maxParallel;
+        }
+        if (scanNodes.size() == 1 && scanNodes.get(0) instanceof OlapScanNode) 
{
+            OlapScanNode olapScanNode = (OlapScanNode) scanNodes.get(0);
+            ConnectContext connectContext = 
statementContext.getConnectContext();
+            if (connectContext != null && 
olapScanNode.shouldUseOneInstance(connectContext)) {
+                return 1;
+            }
+        }
+
+        long tabletNum = 0;
+        for (ScanNode scanNode : scanNodes) {
+            if (scanNode instanceof OlapScanNode) {
+                OlapScanNode olapScanNode = (OlapScanNode) scanNode;
+                tabletNum = olapScanNode.getTotalTabletsNum();
+                break;
+            }
+        }
+
+        ConnectContext connectContext = statementContext.getConnectContext();
+        int colocateMaxParallelNum = 128;
+        if (connectContext != null) {
+            colocateMaxParallelNum = 
connectContext.getSessionVariable().colocateMaxParallelNum;
+        }
+
+        int maxParallelism = (int) Math.max(tabletNum, 
fragment.getParallelExecNum());
+        return Math.min(maxParallelism, colocateMaxParallelNum);
+    }
 }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 02929b38868..de0480ec0b5 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -168,6 +168,7 @@ public class SessionVariable implements Serializable, 
Writable {
             "enable_distinct_streaming_agg_force_passthrough";
     public static final String ENABLE_BROADCAST_JOIN_FORCE_PASSTHROUGH = 
"enable_broadcast_join_force_passthrough";
     public static final String DISABLE_COLOCATE_PLAN = "disable_colocate_plan";
+    public static final String COLOCATE_MAX_PARALLEL_NUM = 
"colocate_max_parallel_num";
     public static final String ENABLE_BUCKET_SHUFFLE_JOIN = 
"enable_bucket_shuffle_join";
     public static final String PARALLEL_FRAGMENT_EXEC_INSTANCE_NUM = 
"parallel_fragment_exec_instance_num";
     public static final String PARALLEL_PIPELINE_TASK_NUM = 
"parallel_pipeline_task_num";
@@ -1291,6 +1292,9 @@ public class SessionVariable implements Serializable, 
Writable {
                         setter = "setFragmentInstanceNum", varType = 
VariableAnnotation.DEPRECATED)
     public int parallelExecInstanceNum = 8;
 
+    @VariableMgr.VarAttr(name = COLOCATE_MAX_PARALLEL_NUM, needForward = true, 
fuzzy = false)
+    public int colocateMaxParallelNum = 128;
+
     @VariableMgr.VarAttr(name = PARALLEL_PIPELINE_TASK_NUM, fuzzy = true, 
needForward = true,
                         setter = "setPipelineTaskNum")
     public int parallelPipelineTaskNum = 0;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to