This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new 962222e8426 branch-4.0: [Fix](Cloud)decouple min pipeline executor 
size from ConnectContext #60958 (#61005)
962222e8426 is described below

commit 962222e842657f175d79740d68aa0b8d168b9aac
Author: Calvin Kirs <[email protected]>
AuthorDate: Wed Mar 4 10:21:06 2026 +0800

    branch-4.0: [Fix](Cloud)decouple min pipeline executor size from 
ConnectContext #60958 (#61005)
    
    #60958
---
 .../doris/cloud/system/CloudSystemInfoService.java | 12 +----
 .../apache/doris/datasource/ExternalScanNode.java  | 12 +++--
 .../apache/doris/datasource/FileQueryScanNode.java | 13 +++--
 .../org/apache/doris/datasource/FileScanNode.java  | 12 +++--
 .../doris/source/RemoteDorisScanNode.java          |  6 ++-
 .../doris/datasource/es/source/EsScanNode.java     |  5 +-
 .../doris/datasource/hive/source/HiveScanNode.java | 13 ++---
 .../doris/datasource/hudi/source/HudiScanNode.java |  6 ++-
 .../iceberg/rewrite/RewriteGroupTask.java          |  3 +-
 .../datasource/iceberg/source/IcebergScanNode.java | 13 +++--
 .../doris/datasource/jdbc/source/JdbcScanNode.java | 16 ++++--
 .../lakesoul/source/LakeSoulScanNode.java          |  7 +--
 .../maxcompute/source/MaxComputeScanNode.java      | 13 +++--
 .../doris/datasource/odbc/source/OdbcScanNode.java | 11 ++--
 .../datasource/paimon/source/PaimonScanNode.java   | 10 ++--
 .../source/TrinoConnectorScanNode.java             |  7 +--
 .../datasource/tvf/source/MetadataScanNode.java    |  6 ++-
 .../doris/datasource/tvf/source/TVFScanNode.java   |  8 +--
 .../org/apache/doris/nereids/cost/CostModel.java   |  3 +-
 .../glue/translator/PhysicalPlanTranslator.java    | 43 +++++++++-------
 .../glue/translator/PlanTranslatorContext.java     | 21 ++++++++
 .../nereids/load/NereidsLoadingTaskPlanner.java    |  6 ++-
 .../nereids/load/NereidsStreamLoadPlanner.java     |  6 ++-
 .../properties/ChildrenPropertiesRegulator.java    |  7 +--
 .../doris/nereids/rules/rewrite/SaltJoin.java      |  4 +-
 .../planner/BackendPartitionedSchemaScanNode.java  |  5 +-
 .../java/org/apache/doris/planner/CTEScanNode.java |  4 +-
 .../org/apache/doris/planner/DataGenScanNode.java  | 10 ++--
 .../org/apache/doris/planner/FileLoadScanNode.java |  4 +-
 .../apache/doris/planner/GroupCommitScanNode.java  |  5 +-
 .../org/apache/doris/planner/OlapScanNode.java     |  9 ++--
 .../org/apache/doris/planner/PlanFragment.java     |  6 ++-
 .../java/org/apache/doris/planner/ScanContext.java | 58 ++++++++++++++++++++++
 .../java/org/apache/doris/planner/ScanNode.java    | 27 ++++++++--
 .../org/apache/doris/planner/SchemaScanNode.java   |  5 +-
 .../java/org/apache/doris/qe/SessionVariable.java  | 37 ++++++++++++--
 .../java/org/apache/doris/qe/StmtExecutor.java     |  4 +-
 .../org/apache/doris/system/SystemInfoService.java |  4 +-
 .../tablefunction/DataGenTableValuedFunction.java  |  4 +-
 .../ExternalFileTableValuedFunction.java           |  5 +-
 .../GroupCommitTableValuedFunction.java            |  4 +-
 .../tablefunction/JdbcQueryTableValueFunction.java |  4 +-
 .../tablefunction/MetadataTableValuedFunction.java |  4 +-
 .../cloud/system/CloudSystemInfoServiceTest.java   | 30 +++++------
 .../doris/datasource/FileQueryScanNodeTest.java    |  5 +-
 .../datasource/hive/source/HiveScanNodeTest.java   |  5 +-
 .../iceberg/source/IcebergScanNodeTest.java        |  3 +-
 .../paimon/source/PaimonScanNodeTest.java          |  6 ++-
 .../tvf/source/MetadataScanNodeTest.java           |  7 +--
 .../datasource/tvf/source/TVFScanNodeTest.java     |  3 +-
 .../org/apache/doris/qe/HmsQueryCacheTest.java     |  3 +-
 .../org/apache/doris/qe/OlapQueryCacheTest.java    |  5 +-
 .../apache/doris/system/SystemInfoServiceTest.java | 12 ++---
 53 files changed, 379 insertions(+), 162 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
index 8103a9ecfd8..5d62986cff7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/cloud/system/CloudSystemInfoService.java
@@ -772,16 +772,8 @@ public class CloudSystemInfoService extends 
SystemInfoService {
     }
 
     @Override
-    public int getMinPipelineExecutorSize() {
-        String clusterName = "";
-        try {
-            clusterName = ConnectContext.get().getCloudCluster(false);
-        } catch (ComputeGroupException e) {
-            LOG.warn("failed to get cluster name", e);
-            return 1;
-        }
-        if (ConnectContext.get() != null
-                && Strings.isNullOrEmpty(clusterName)) {
+    public int getMinPipelineExecutorSize(String clusterName) {
+        if (Strings.isNullOrEmpty(clusterName)) {
             return 1;
         }
         List<Backend> currentBackends = getBackendsByClusterName(clusterName);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index ac805c33508..8829f1fdbdd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
@@ -50,9 +51,14 @@ public abstract class ExternalScanNode extends ScanNode {
             ? new 
FederationBackendPolicy(NodeSelectionStrategy.CONSISTENT_HASHING)
             : new FederationBackendPolicy();
 
-    public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
-            boolean needCheckColumnPriv) {
-        super(id, desc, planNodeName, statisticalType);
+    public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+            ScanContext scanContext, boolean needCheckColumnPriv) {
+        this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext, 
needCheckColumnPriv);
+    }
+
+    public ExternalScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+            StatisticalType statisticalType, ScanContext scanContext, boolean 
needCheckColumnPriv) {
+        super(id, desc, planNodeName, scanContext, statisticalType);
         this.needCheckColumnPriv = needCheckColumnPriv;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
index ab039e9e9b6..1dc6902681d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileQueryScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.common.util.BrokerUtil;
 import org.apache.doris.common.util.Util;
 import org.apache.doris.datasource.hive.source.HiveSplit;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.qe.StmtExecutor;
@@ -105,9 +106,13 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
     public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
-            StatisticalType statisticalType, boolean needCheckColumnPriv,
-            SessionVariable sv) {
-        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+            ScanContext scanContext, boolean needCheckColumnPriv, 
SessionVariable sv) {
+        this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext, 
needCheckColumnPriv, sv);
+    }
+
+    public FileQueryScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+            StatisticalType statisticalType, ScanContext scanContext, boolean 
needCheckColumnPriv, SessionVariable sv) {
+        super(id, desc, planNodeName, statisticalType, scanContext, 
needCheckColumnPriv);
         this.sessionVariable = sv;
     }
 
@@ -539,7 +544,7 @@ public abstract class FileQueryScanNode extends 
FileScanNode {
     @Override
     public int getNumInstances() {
         if (sessionVariable.isIgnoreStorageDataDistribution()) {
-            return sessionVariable.getParallelExecInstanceNum();
+            return 
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
         }
         return scanRangeLocations.size();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index bbae44d66a4..cbf2420cead 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -35,6 +35,7 @@ import org.apache.doris.nereids.types.DataType;
 import org.apache.doris.nereids.types.VarcharType;
 import org.apache.doris.nereids.util.TypeCoercionUtils;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
 import org.apache.doris.thrift.TExpr;
@@ -71,9 +72,14 @@ public abstract class FileScanNode extends ExternalScanNode {
     // For display pushdown agg result
     protected long tableLevelRowCount = -1;
 
-    public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, StatisticalType statisticalType,
-            boolean needCheckColumnPriv) {
-        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv);
+    public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+            ScanContext scanContext, boolean needCheckColumnPriv) {
+        this(id, desc, planNodeName, StatisticalType.DEFAULT, scanContext, 
needCheckColumnPriv);
+    }
+
+    public FileScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
+            StatisticalType statisticalType, ScanContext scanContext, boolean 
needCheckColumnPriv) {
+        super(id, desc, planNodeName, statisticalType, scanContext, 
needCheckColumnPriv);
         this.needCheckColumnPriv = needCheckColumnPriv;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
index 520f135baaa..23abb9f0b52 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/doris/source/RemoteDorisScanNode.java
@@ -31,6 +31,7 @@ import org.apache.doris.common.Pair;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
@@ -77,8 +78,9 @@ public class RemoteDorisScanNode extends FileQueryScanNode {
     private RemoteDorisSource source;
 
     public RemoteDorisScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
-                               SessionVariable sv) {
-        super(id, desc, "REMOTE_DORIS_SCAN_NODE", 
StatisticalType.REMOTE_DORIS_SCAN_NODE, needCheckColumnPriv, sv);
+                               SessionVariable sv, ScanContext scanContext) {
+        super(id, desc, "REMOTE_DORIS_SCAN_NODE", 
StatisticalType.REMOTE_DORIS_SCAN_NODE,
+                scanContext, needCheckColumnPriv, sv);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
index 1cdd351a22a..539efd7b7ca 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
@@ -40,6 +40,7 @@ import 
org.apache.doris.datasource.es.QueryBuilders.QueryBuilder;
 import org.apache.doris.planner.PartitionPruner;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.RangePartitionPrunerV2;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -83,8 +84,8 @@ public class EsScanNode extends ExternalScanNode {
     /**
      * For multicatalog es.
      **/
-    public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
esExternalTable) {
-        super(id, desc, "EsScanNode", StatisticalType.ES_SCAN_NODE, false);
+    public EsScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
esExternalTable, ScanContext scanContext) {
+        super(id, desc, "EsScanNode", StatisticalType.ES_SCAN_NODE, 
scanContext, false);
         if (esExternalTable) {
             EsExternalTable externalTable = (EsExternalTable) 
(desc.getTable());
             table = externalTable.getEsTable();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
index 41dc478c38c..4454d48d74c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/source/HiveScanNode.java
@@ -48,6 +48,7 @@ import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.fs.DirectoryLister;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
@@ -112,14 +113,15 @@ public class HiveScanNode extends FileQueryScanNode {
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv,
-            DirectoryLister directoryLister) {
-        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE, 
needCheckColumnPriv, sv, directoryLister);
+            DirectoryLister directoryLister, ScanContext scanContext) {
+        this(id, desc, "HIVE_SCAN_NODE", StatisticalType.HIVE_SCAN_NODE,
+                needCheckColumnPriv, sv, directoryLister, scanContext);
     }
 
     public HiveScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
             StatisticalType statisticalType, boolean needCheckColumnPriv, 
SessionVariable sv,
-            DirectoryLister directoryLister) {
-        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, 
sv);
+            DirectoryLister directoryLister, ScanContext scanContext) {
+        super(id, desc, planNodeName, statisticalType, scanContext, 
needCheckColumnPriv, sv);
         hmsTable = (HMSExternalTable) desc.getTable();
         brokerName = hmsTable.getCatalog().bindBrokerName();
         this.directoryLister = directoryLister;
@@ -321,7 +323,7 @@ public class HiveScanNode extends FileQueryScanNode {
                     totalFileNum += fileCacheValue.getFiles().size();
                 }
             }
-            int parallelNum = sessionVariable.getParallelExecInstanceNum();
+            int parallelNum = 
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
             needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, 
numBackends, totalFileNum);
         }
 
@@ -638,4 +640,3 @@ public class HiveScanNode extends FileQueryScanNode {
         return compressType;
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
index 4025deff6c8..0b63d4d539a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hudi/source/HudiScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.hudi.HudiUtils;
 import org.apache.doris.datasource.mvcc.MvccUtil;
 import org.apache.doris.fs.DirectoryLister;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -131,8 +132,9 @@ public class HudiScanNode extends HiveScanNode {
      */
     public HudiScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
             Optional<TableScanParams> scanParams, 
Optional<IncrementalRelation> incrementalRelation,
-            SessionVariable sv, DirectoryLister directoryLister) {
-        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE, 
needCheckColumnPriv, sv, directoryLister);
+            SessionVariable sv, DirectoryLister directoryLister, ScanContext 
scanContext) {
+        super(id, desc, "HUDI_SCAN_NODE", StatisticalType.HUDI_SCAN_NODE,
+                needCheckColumnPriv, sv, directoryLister, scanContext);
         isCowTable = hmsTable.isHoodieCowTable();
         if (LOG.isDebugEnabled()) {
             if (isCowTable) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
index cd623528c76..13b7ee3ff68 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteGroupTask.java
@@ -288,7 +288,8 @@ public class RewriteGroupTask implements 
TransientTaskExecutor {
                 "availableBeCount must be greater than 0 for rewrite task");
 
         // 3. Get default parallelism from session variable (pipeline task num)
-        int defaultParallelism = 
connectContext.getSessionVariable().getParallelExecInstanceNum();
+        String clusterName = 
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+        int defaultParallelism = 
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName);
 
         // 4. Determine strategy based on expected file count
         boolean useGather = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
index 982fe4a7e2c..8b5b4fc205d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/source/IcebergScanNode.java
@@ -46,6 +46,7 @@ import 
org.apache.doris.datasource.iceberg.profile.IcebergMetricsReporter;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
@@ -153,8 +154,8 @@ public class IcebergScanNode extends FileQueryScanNode {
 
     // for test
     @VisibleForTesting
-    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
-        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, false, sv);
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv, ScanContext scanContext) {
+        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, scanContext, false, sv);
     }
 
     /**
@@ -163,8 +164,10 @@ public class IcebergScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
-        super(id, desc, "ICEBERG_SCAN_NODE", 
StatisticalType.ICEBERG_SCAN_NODE, needCheckColumnPriv, sv);
+    public IcebergScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv,
+            ScanContext scanContext) {
+        super(id, desc, "ICEBERG_SCAN_NODE", StatisticalType.ICEBERG_SCAN_NODE,
+                scanContext, needCheckColumnPriv, sv);
 
         ExternalTable table = (ExternalTable) desc.getTable();
         if (table instanceof HMSExternalTable) {
@@ -739,7 +742,7 @@ public class IcebergScanNode extends FileQueryScanNode {
         try (CloseableIterable<FileScanTask> fileScanTasks = 
planFileScanTask(scan)) {
             if (tableLevelPushDownCount) {
                 int needSplitCnt = countFromSnapshot < 
COUNT_WITH_PARALLEL_SPLITS
-                        ? 1 : sessionVariable.getParallelExecInstanceNum() * 
numBackends;
+                        ? 1 : 
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName()) * 
numBackends;
                 for (FileScanTask next : fileScanTasks) {
                     splits.add(createIcebergSplit(next));
                     if (splits.size() >= needSplitCnt) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 793dc352fee..7e5a636455d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -40,6 +40,7 @@ import org.apache.doris.datasource.ExternalFunctionRules;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.jdbc.JdbcExternalTable;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -71,8 +72,8 @@ public class JdbcScanNode extends ExternalScanNode {
     private JdbcTable tbl;
     private long catalogId;
 
-    public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
isJdbcExternalTable) {
-        super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
+    public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
isJdbcExternalTable, ScanContext scanContext) {
+        super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, 
scanContext, false);
         if (isJdbcExternalTable) {
             JdbcExternalTable jdbcExternalTable = (JdbcExternalTable) 
(desc.getTable());
             tbl = jdbcExternalTable.getJdbcTable();
@@ -83,8 +84,9 @@ public class JdbcScanNode extends ExternalScanNode {
         tableName = tbl.getProperRemoteFullTableName(jdbcType);
     }
 
-    public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
isTableValuedFunction, String query) {
-        super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, false);
+    public JdbcScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
isTableValuedFunction, String query,
+            ScanContext scanContext) {
+        super(id, desc, "JdbcScanNode", StatisticalType.JDBC_SCAN_NODE, 
scanContext, false);
         this.isTableValuedFunction = isTableValuedFunction;
         this.query = query;
         tbl = (JdbcTable) desc.getTable();
@@ -275,7 +277,11 @@ public class JdbcScanNode extends ExternalScanNode {
 
     @Override
     public int getNumInstances() {
-        return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        ConnectContext context = ConnectContext.get();
+        if (context == null) {
+            return 1;
+        }
+        return 
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
     }
 
     private static boolean shouldPushDownConjunct(TOdbcTableType tableType, 
Expr expr) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
index a5e673a6cd5..a1e287432de 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/lakesoul/source/LakeSoulScanNode.java
@@ -26,6 +26,7 @@ import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.lakesoul.LakeSoulExternalTable;
 import org.apache.doris.datasource.lakesoul.LakeSoulUtils;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -83,8 +84,9 @@ public class LakeSoulScanNode extends FileQueryScanNode {
 
     String readType;
 
-    public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
-        super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
needCheckColumnPriv, sv);
+    public LakeSoulScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv,
+            ScanContext scanContext) {
+        super(id, desc, "planNodeName", StatisticalType.LAKESOUL_SCAN_NODE, 
scanContext, needCheckColumnPriv, sv);
     }
 
     @Override
@@ -285,4 +287,3 @@ public class LakeSoulScanNode extends FileQueryScanNode {
         return splits;
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
index 502a2c645cd..24c681c8959 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/maxcompute/source/MaxComputeScanNode.java
@@ -43,6 +43,7 @@ import 
org.apache.doris.datasource.maxcompute.source.MaxComputeSplit.SplitType;
 import 
org.apache.doris.nereids.trees.plans.logical.LogicalFileScan.SelectedPartitions;
 import org.apache.doris.nereids.util.DateUtils;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -111,15 +112,17 @@ public class MaxComputeScanNode extends FileQueryScanNode 
{
     // For new planner
     public MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc,
             SelectedPartitions selectedPartitions, boolean needCheckColumnPriv,
-            SessionVariable sv) {
+            SessionVariable sv, ScanContext scanContext) {
         this(id, desc, "MCScanNode", StatisticalType.MAX_COMPUTE_SCAN_NODE,
-                selectedPartitions, needCheckColumnPriv, sv);
+                selectedPartitions, needCheckColumnPriv, sv, scanContext);
     }
 
     private MaxComputeScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName,
-            StatisticalType statisticalType, SelectedPartitions 
selectedPartitions,
-            boolean needCheckColumnPriv, SessionVariable sv) {
-        super(id, desc, planNodeName, statisticalType, needCheckColumnPriv, 
sv);
+            StatisticalType statisticalType,
+            SelectedPartitions selectedPartitions, boolean 
needCheckColumnPriv, SessionVariable sv,
+            ScanContext scanContext) {
+        super(id, desc, planNodeName, statisticalType, scanContext,
+                needCheckColumnPriv, sv);
         table = (MaxComputeExternalTable) desc.getTable();
         this.selectedPartitions = selectedPartitions;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index da4c96c7a59..9d8bf242823 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -30,6 +30,7 @@ import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TExplainLevel;
@@ -66,8 +67,8 @@ public class OdbcScanNode extends ExternalScanNode {
     /**
      * Constructs node to scan given data files of table 'tbl'.
      */
-    public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl) {
-        super(id, desc, "SCAN ODBC", StatisticalType.ODBC_SCAN_NODE, false);
+    public OdbcScanNode(PlanNodeId id, TupleDescriptor desc, OdbcTable tbl, 
ScanContext scanContext) {
+        super(id, desc, "SCAN ODBC", StatisticalType.ODBC_SCAN_NODE, 
scanContext, false);
         connectString = tbl.getConnectString();
         odbcType = tbl.getOdbcTableType();
         tblName = JdbcTable.databaseProperName(odbcType, 
tbl.getOdbcTableName());
@@ -215,7 +216,11 @@ public class OdbcScanNode extends ExternalScanNode {
 
     @Override
     public int getNumInstances() {
-        return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        ConnectContext context = ConnectContext.get();
+        if (context == null) {
+            return 1;
+        }
+        return 
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
     }
 
     public static boolean shouldPushDownConjunct(TOdbcTableType tableType, 
Expr expr) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
index 3004e9dc027..5a7ecec791f 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/paimon/source/PaimonScanNode.java
@@ -37,6 +37,7 @@ import 
org.apache.doris.datasource.paimon.profile.PaimonMetricRegistry;
 import org.apache.doris.datasource.paimon.profile.PaimonScanMetricsReporter;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -148,8 +149,10 @@ public class PaimonScanNode extends FileQueryScanNode {
     public PaimonScanNode(PlanNodeId id,
                           TupleDescriptor desc,
                           boolean needCheckColumnPriv,
-                          SessionVariable sv) {
-        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE, 
needCheckColumnPriv, sv);
+                          SessionVariable sv,
+                          ScanContext scanContext) {
+        super(id, desc, "PAIMON_SCAN_NODE", StatisticalType.PAIMON_SCAN_NODE,
+                scanContext, needCheckColumnPriv, sv);
     }
 
     @Override
@@ -404,7 +407,8 @@ public class PaimonScanNode extends FileQueryScanNode {
         // if applyCountPushdown is true, calcute row count for count pushdown
         if (applyCountPushdown && !pushDownCountSplits.isEmpty()) {
             if (pushDownCountSum > COUNT_WITH_PARALLEL_SPLITS) {
-                int minSplits = sessionVariable.getParallelExecInstanceNum() * 
numBackends;
+                int minSplits = 
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName())
+                        * numBackends;
                 pushDownCountSplits = pushDownCountSplits.subList(0, 
Math.min(pushDownCountSplits.size(), minSplits));
             } else {
                 pushDownCountSplits = 
Collections.singletonList(pushDownCountSplits.get(0));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
index 64cba7f8a4e..44d89970471 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/trinoconnector/source/TrinoConnectorScanNode.java
@@ -28,6 +28,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.datasource.trinoconnector.TrinoConnectorPluginLoader;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -99,9 +100,9 @@ public class TrinoConnectorScanNode extends 
FileQueryScanNode {
     private Constraint constraint;
 
     public TrinoConnectorScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv,
-            SessionVariable sv) {
-        super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", 
StatisticalType.TRINO_CONNECTOR_SCAN_NODE, needCheckColumnPriv,
-                sv);
+            SessionVariable sv, ScanContext scanContext) {
+        super(id, desc, "TRINO_CONNECTOR_SCAN_NODE", 
StatisticalType.TRINO_CONNECTOR_SCAN_NODE,
+                scanContext, needCheckColumnPriv, sv);
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
index ac34ac817c4..eae0ab4f406 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/MetadataScanNode.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.system.Backend;
@@ -45,8 +46,9 @@ public class MetadataScanNode extends ExternalScanNode {
     private boolean initedScanRangeLocations = false;
     private final List<TScanRangeLocations> scanRangeLocations = 
Lists.newArrayList();
 
-    public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, 
MetadataTableValuedFunction tvf) {
-        super(id, desc, "METADATA_SCAN_NODE", 
StatisticalType.METADATA_SCAN_NODE, false);
+    public MetadataScanNode(PlanNodeId id, TupleDescriptor desc, 
MetadataTableValuedFunction tvf,
+            ScanContext scanContext) {
+        super(id, desc, "METADATA_SCAN_NODE", 
StatisticalType.METADATA_SCAN_NODE, scanContext, false);
         this.tvf = tvf;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
index 12db0b7c1bf..6ba1b5d4a32 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java
@@ -32,6 +32,7 @@ import org.apache.doris.datasource.FileSplit.FileSplitCreator;
 import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.TableFormatType;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.spi.Split;
 import org.apache.doris.statistics.StatisticalType;
@@ -68,8 +69,9 @@ public class TVFScanNode extends FileQueryScanNode {
      * eg: s3 tvf
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv) {
-        super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, 
needCheckColumnPriv, sv);
+    public TVFScanNode(PlanNodeId id, TupleDescriptor desc, boolean 
needCheckColumnPriv, SessionVariable sv,
+            ScanContext scanContext) {
+        super(id, desc, "TVF_SCAN_NODE", StatisticalType.TVF_SCAN_NODE, 
scanContext, needCheckColumnPriv, sv);
         table = (FunctionGenTable) this.desc.getTable();
         tableValuedFunction = (ExternalFileTableValuedFunction) table.getTvf();
     }
@@ -142,7 +144,7 @@ public class TVFScanNode extends FileQueryScanNode {
         // Push down count optimization.
         boolean needSplit = true;
         if (getPushDownAggNoGroupingOp() == TPushAggOp.COUNT) {
-            int parallelNum = sessionVariable.getParallelExecInstanceNum();
+            int parallelNum = 
sessionVariable.getParallelExecInstanceNum(scanContext.getClusterName());
             int totalFileNum = fileStatuses.size();
             needSplit = FileSplitter.needSplitForCountPushdown(parallelNum, 
numBackends, totalFileNum);
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
index 9f828847eaa..b113a3d1cc6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/cost/CostModel.java
@@ -99,7 +99,8 @@ class CostModel extends PlanVisitor<Cost, PlanContext> {
             parallelInstance = 8;
         } else {
             beNumber = Math.max(1, 
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
-            parallelInstance = Math.max(1, 
connectContext.getSessionVariable().getParallelExecInstanceNum());
+            String clusterName = 
sessionVariable.resolveCloudClusterName(connectContext);
+            parallelInstance = Math.max(1, 
sessionVariable.getParallelExecInstanceNum(clusterName));
         }
         this.hboPlanStatisticsProvider = 
Objects.requireNonNull(Env.getCurrentEnv().getHboPlanStatisticsManager()
                 .getHboPlanStatisticsProvider(), "HboPlanStatisticsProvider is 
null");
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index 0a98be0281b..dcf5e3ec8eb 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -69,7 +69,6 @@ import 
org.apache.doris.datasource.lakesoul.source.LakeSoulScanNode;
 import org.apache.doris.datasource.maxcompute.MaxComputeExternalTable;
 import org.apache.doris.datasource.maxcompute.source.MaxComputeScanNode;
 import org.apache.doris.datasource.odbc.source.OdbcScanNode;
-import org.apache.doris.datasource.paimon.PaimonExternalTable;
 import org.apache.doris.datasource.paimon.source.PaimonScanNode;
 import org.apache.doris.datasource.trinoconnector.TrinoConnectorExternalTable;
 import 
org.apache.doris.datasource.trinoconnector.source.TrinoConnectorScanNode;
@@ -649,10 +648,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             }
             switch (((HMSExternalTable) table).getDlaType()) {
                 case ICEBERG:
-                    scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+                    scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
+                            context.getScanContext());
                     break;
                 case HIVE:
-                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv, directoryLister);
+                    scanNode = new HiveScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv, directoryLister,
+                            context.getScanContext());
                     HiveScanNode hiveScanNode = (HiveScanNode) scanNode;
                     
hiveScanNode.setSelectedPartitions(fileScan.getSelectedPartitions());
                     if (fileScan.getTableSample().isPresent()) {
@@ -671,18 +672,23 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                     throw new RuntimeException("do not support DLA type " + 
((HMSExternalTable) table).getDlaType());
             }
         } else if (table instanceof IcebergExternalTable) {
-            scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
-        } else if (table instanceof PaimonExternalTable) {
-            scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+            scanNode = new IcebergScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
+                    context.getScanContext());
+        } else if (table.getType() == TableIf.TableType.PAIMON_EXTERNAL_TABLE) 
{
+            scanNode = new PaimonScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
+                    context.getScanContext());
         } else if (table instanceof TrinoConnectorExternalTable) {
-            scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+            scanNode = new TrinoConnectorScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
+                    context.getScanContext());
         } else if (table instanceof MaxComputeExternalTable) {
             scanNode = new MaxComputeScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
-                    fileScan.getSelectedPartitions(), false, sv);
+                    fileScan.getSelectedPartitions(), false, sv, 
context.getScanContext());
         } else if (table instanceof LakeSoulExternalTable) {
-            scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+            scanNode = new LakeSoulScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
+                    context.getScanContext());
         } else if (table instanceof RemoteDorisExternalTable) {
-            scanNode = new RemoteDorisScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv);
+            scanNode = new RemoteDorisScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false, sv,
+                    context.getScanContext());
         } else {
             throw new RuntimeException("do not support table type " + 
table.getType());
         }
@@ -721,7 +727,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TableIf table = esScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
         EsScanNode esScanNode = new EsScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
-                table instanceof EsExternalTable);
+                table instanceof EsExternalTable, context.getScanContext());
         esScanNode.setNereidsId(esScan.getId());
         context.getNereidsIdToPlanNodeIdMap().put(esScan.getId(), 
esScanNode.getId());
         Utils.execWithUncheckedException(esScanNode::init);
@@ -766,7 +772,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         PhysicalHudiScan hudiScan = (PhysicalHudiScan) fileScan;
         ScanNode scanNode = new HudiScanNode(context.nextPlanNodeId(), 
tupleDescriptor, false,
                 hudiScan.getScanParams(), hudiScan.getIncrementalRelation(), 
ConnectContext.get().getSessionVariable(),
-                directoryLister);
+                directoryLister, context.getScanContext());
         if (fileScan.getTableSnapshot().isPresent()) {
             ((FileQueryScanNode) 
scanNode).setQueryTableSnapshot(fileScan.getTableSnapshot().get());
         }
@@ -807,7 +813,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TableIf table = jdbcScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
         JdbcScanNode jdbcScanNode = new JdbcScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
-                table instanceof JdbcExternalTable);
+                table instanceof JdbcExternalTable, context.getScanContext());
         jdbcScanNode.setNereidsId(jdbcScan.getId());
         context.getNereidsIdToPlanNodeIdMap().put(jdbcScan.getId(), 
jdbcScanNode.getId());
         Utils.execWithUncheckedException(jdbcScanNode::init);
@@ -826,7 +832,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
         TableIf table = odbcScan.getTable();
         TupleDescriptor tupleDescriptor = generateTupleDesc(slots, table, 
context);
         OdbcScanNode odbcScanNode = new OdbcScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
-                (OdbcTable) table);
+                (OdbcTable) table, context.getScanContext());
         odbcScanNode.setNereidsId(odbcScan.getId());
         context.getNereidsIdToPlanNodeIdMap().put(odbcScan.getId(), 
odbcScanNode.getId());
         Utils.execWithUncheckedException(odbcScanNode::init);
@@ -867,7 +873,8 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             generateTupleDesc(olapScan.getBaseOutputs(), olapTable, context);
         }
 
-        OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), 
tupleDescriptor, "OlapScanNode");
+        OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), 
tupleDescriptor, "OlapScanNode",
+                context.getScanContext());
         olapScanNode.setNereidsId(olapScan.getId());
         context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(), 
olapScanNode.getId());
 
@@ -1058,12 +1065,12 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
             scanNode = new 
BackendPartitionedSchemaScanNode(context.nextPlanNodeId(), table, 
tupleDescriptor,
                     schemaScan.getSchemaCatalog().orElse(null), 
schemaScan.getSchemaDatabase().orElse(null),
                     schemaScan.getSchemaTable().orElse(null),
-                    translateToExprs(schemaScan.getFrontendConjuncts(), 
context));
+                    translateToExprs(schemaScan.getFrontendConjuncts(), 
context), context.getScanContext());
         } else {
             scanNode = new SchemaScanNode(context.nextPlanNodeId(), 
tupleDescriptor,
                     schemaScan.getSchemaCatalog().orElse(null), 
schemaScan.getSchemaDatabase().orElse(null),
                     schemaScan.getSchemaTable().orElse(null), 
translateToExprs(schemaScan.getFrontendConjuncts(),
-                    context));
+                    context), context.getScanContext());
         }
         scanNode.setNereidsId(schemaScan.getId());
         context.getNereidsIdToPlanNodeIdMap().put(schemaScan.getId(), 
scanNode.getId());
@@ -1410,7 +1417,7 @@ public class PhysicalPlanTranslator extends 
DefaultPlanVisitor<PlanFragment, Pla
                 context.addExprIdSlotRefPair(consumerSlot.getExprId(), 
slotRef);
             }
         }
-        CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor);
+        CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor, 
context.getScanContext());
         translateRuntimeFilter(cteConsumer, cteScanNode, context);
         context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), 
cteScanNode);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
index dd9486367c2..7420524239d 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PlanTranslatorContext.java
@@ -44,6 +44,7 @@ import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNode;
 import org.apache.doris.planner.PlanNodeId;
 import org.apache.doris.planner.RuntimeFilterId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
@@ -67,6 +68,7 @@ import javax.annotation.Nullable;
 public class PlanTranslatorContext {
     private final ConnectContext connectContext;
     private final StatementContext statementContext;
+    private final ScanContext scanContext;
     private final List<PlanFragment> planFragments = Lists.newArrayList();
 
     private DescriptorTable descTable;
@@ -123,6 +125,11 @@ public class PlanTranslatorContext {
     public PlanTranslatorContext(CascadesContext ctx) {
         this.connectContext = ctx.getConnectContext();
         this.statementContext = ctx.getStatementContext();
+        this.scanContext = connectContext == null || 
connectContext.getSessionVariable() == null
+                ? ScanContext.EMPTY
+                : ScanContext.builder()
+                        
.clusterName(connectContext.getSessionVariable().resolveCloudClusterName(connectContext))
+                        .build();
         this.translator = new 
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
         this.topnFilterContext = ctx.getTopnFilterContext();
         this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
@@ -133,6 +140,11 @@ public class PlanTranslatorContext {
     public PlanTranslatorContext(CascadesContext ctx, DescriptorTable 
descTable) {
         this.connectContext = ctx.getConnectContext();
         this.statementContext = ctx.getStatementContext();
+        this.scanContext = connectContext == null || 
connectContext.getSessionVariable() == null
+                ? ScanContext.EMPTY
+                : ScanContext.builder()
+                        
.clusterName(connectContext.getSessionVariable().resolveCloudClusterName(connectContext))
+                        .build();
         this.translator = new 
RuntimeFilterTranslator(ctx.getRuntimeFilterContext());
         this.topnFilterContext = ctx.getTopnFilterContext();
         this.runtimeFilterV2Context = ctx.getRuntimeFilterV2Context();
@@ -146,6 +158,7 @@ public class PlanTranslatorContext {
     public PlanTranslatorContext() {
         this.connectContext = null;
         this.statementContext = new StatementContext();
+        this.scanContext = ScanContext.EMPTY;
         this.translator = null;
         this.topnFilterContext = new TopnFilterContext();
         IdGenerator<RuntimeFilterId> runtimeFilterIdGen = 
RuntimeFilterId.createGenerator();
@@ -279,6 +292,14 @@ public class PlanTranslatorContext {
         physicalRelations.add(physicalRelation);
     }
 
+    public String getClusterName() {
+        return scanContext.getClusterName();
+    }
+
+    public ScanContext getScanContext() {
+        return scanContext;
+    }
+
     public List<PhysicalRelation> getPhysicalRelations() {
         return physicalRelations;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
index 4998e22ca0f..42999dd3532 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsLoadingTaskPlanner.java
@@ -36,6 +36,7 @@ import org.apache.doris.planner.OlapTableSink;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.thrift.TBrokerFileStatus;
@@ -195,7 +196,10 @@ public class NereidsLoadingTaskPlanner {
         }
 
         // Create a single FileLoadScanNode for all file groups
-        FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple());
+        String clusterName = ConnectContext.get() == null ? ""
+                : 
ConnectContext.get().getSessionVariable().resolveCloudClusterName();
+        FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfos.get(0).getDestTuple(),
+                ScanContext.builder().clusterName(clusterName).build());
         fileScanNode.finalizeForNereids(loadId, fileGroupInfos, contexts, 
loadPlanInfos);
         scanNodes.add(fileScanNode);
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
index 4cc7c3384e8..a1c912eede8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/load/NereidsStreamLoadPlanner.java
@@ -38,6 +38,7 @@ import org.apache.doris.planner.FileLoadScanNode;
 import org.apache.doris.planner.PlanFragment;
 import org.apache.doris.planner.PlanFragmentId;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.service.FrontendOptions;
@@ -253,7 +254,10 @@ public class NereidsStreamLoadPlanner {
         scanTupleDesc.setTable(destTable);
         NereidsLoadPlanInfoCollector.LoadPlanInfo loadPlanInfo = 
planInfoCollector.collectLoadPlanInfo(streamLoadPlan,
                 descriptorTable, scanTupleDesc);
-        FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfo.getDestTuple());
+        String clusterName = ConnectContext.get() == null ? ""
+                : 
ConnectContext.get().getSessionVariable().resolveCloudClusterName();
+        FileLoadScanNode fileScanNode = new FileLoadScanNode(new 
PlanNodeId(0), loadPlanInfo.getDestTuple(),
+                ScanContext.builder().clusterName(clusterName).build());
         fileScanNode.finalizeForNereids(loadId, 
Lists.newArrayList(fileGroupInfo), Lists.newArrayList(context),
                 Lists.newArrayList(loadPlanInfo));
         scanNode = fileScanNode;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
index 5e1346c2991..af4baee6043 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/properties/ChildrenPropertiesRegulator.java
@@ -311,9 +311,10 @@ public class ChildrenPropertiesRegulator extends 
PlanVisitor<List<List<PhysicalP
                 int prunedPartNum = candidate.getSelectedPartitionIds().size();
                 int bucketNum = 
candidate.getTable().getDefaultDistributionInfo().getBucketNum();
                 int totalBucketNum = prunedPartNum * bucketNum;
-                int backEndNum = Math.max(1, 
ConnectContext.get().getEnv().getClusterInfo()
-                        .getBackendsNumber(true));
-                int paraNum = Math.max(1, 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum());
+                ConnectContext connectContext = ConnectContext.get();
+                int backEndNum = Math.max(1, 
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
+                String clusterName = 
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+                int paraNum = Math.max(1, 
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
                 return totalBucketNum < backEndNum * paraNum * 0.8;
             }
         }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
index 1227d63ebc8..dd04b5ceee8 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/SaltJoin.java
@@ -340,7 +340,9 @@ public class SaltJoin extends OneRewriteRuleFactory {
                 .getSessionVariable().skewRewriteJoinSaltExplodeFactor;
         if (factor == 0) {
             int beNumber = Math.max(1, 
connectContext.getEnv().getClusterInfo().getBackendsNumber(true));
-            int parallelInstance = Math.max(1, 
connectContext.getSessionVariable().getParallelExecInstanceNum());
+            String clusterName = 
connectContext.getSessionVariable().resolveCloudClusterName(connectContext);
+            int parallelInstance = Math.max(1,
+                    
connectContext.getSessionVariable().getParallelExecInstanceNum(clusterName));
             factor = (int) Math.min((long) beNumber * parallelInstance * 
SALT_FACTOR, Integer.MAX_VALUE);
         }
         return Math.max(factor, 1);
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
index f7610480efa..635cf1bbac0 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/planner/BackendPartitionedSchemaScanNode.java
@@ -96,8 +96,9 @@ public class BackendPartitionedSchemaScanNode extends 
SchemaScanNode {
     private Collection<Long> selectedPartitionIds = Lists.newArrayList();
 
     public BackendPartitionedSchemaScanNode(PlanNodeId id, TableIf table, 
TupleDescriptor desc,
-            String schemaCatalog, String schemaDatabase, String schemaTable, 
List<Expr> frontendConjuncts) {
-        super(id, desc, schemaCatalog, schemaDatabase, schemaTable, 
frontendConjuncts);
+            String schemaCatalog, String schemaDatabase, String schemaTable, 
List<Expr> frontendConjuncts,
+            ScanContext scanContext) {
+        super(id, desc, schemaCatalog, schemaDatabase, schemaTable, 
frontendConjuncts, scanContext);
         this.tableIf = table;
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
index 1e53a9d1cb5..7f5cf61b4c6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/CTEScanNode.java
@@ -32,8 +32,8 @@ import java.util.List;
 public class CTEScanNode extends ScanNode {
     private static final PlanNodeId UNINITIAL_PLANNODEID = new PlanNodeId(-1);
 
-    public CTEScanNode(TupleDescriptor desc) {
-        super(UNINITIAL_PLANNODEID, desc, "CTEScanNode", 
StatisticalType.CTE_SCAN_NODE);
+    public CTEScanNode(TupleDescriptor desc, ScanContext scanContext) {
+        super(UNINITIAL_PLANNODEID, desc, "CTEScanNode", scanContext, 
StatisticalType.CTE_SCAN_NODE);
     }
 
     public void setPlanNodeId(PlanNodeId id) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index aa873276d62..b754155431d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -45,8 +45,9 @@ public class DataGenScanNode extends ExternalScanNode {
 
     private DataGenTableValuedFunction tvf;
 
-    public DataGenScanNode(PlanNodeId id, TupleDescriptor desc, 
DataGenTableValuedFunction tvf) {
-        super(id, desc, "DataGenScanNode", 
StatisticalType.TABLE_VALUED_FUNCTION_NODE, false);
+    public DataGenScanNode(PlanNodeId id, TupleDescriptor desc, 
DataGenTableValuedFunction tvf,
+            ScanContext scanContext) {
+        super(id, desc, "DataGenScanNode", 
StatisticalType.TABLE_VALUED_FUNCTION_NODE, scanContext, false);
         this.tvf = tvf;
     }
 
@@ -95,8 +96,9 @@ public class DataGenScanNode extends ExternalScanNode {
     // by multi-processes or multi-threads. So we assign instance number to 1.
     @Override
     public int getNumInstances() {
-        if 
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
-            return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        ConnectContext context = ConnectContext.get();
+        if (context != null && 
context.getSessionVariable().isIgnoreStorageDataDistribution()) {
+            return 
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
         }
         return 1;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
index c2e873c6613..32d74d76aa0 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/FileLoadScanNode.java
@@ -59,8 +59,8 @@ public class FileLoadScanNode extends FileScanNode {
      * External file scan node for load from file
      * These scan nodes do not have corresponding catalog/database/table info, 
so no need to do priv check
      */
-    public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc) {
-        super(id, desc, "FILE_LOAD_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, 
false);
+    public FileLoadScanNode(PlanNodeId id, TupleDescriptor desc, ScanContext 
scanContext) {
+        super(id, desc, "FILE_LOAD_SCAN_NODE", StatisticalType.FILE_SCAN_NODE, 
scanContext, false);
     }
 
     public void finalizeForNereids(TUniqueId loadId, 
List<NereidsFileGroupInfo> fileGroupInfos,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
index ef95ee61e3c..b2d1c02976c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/GroupCommitScanNode.java
@@ -35,9 +35,8 @@ public class GroupCommitScanNode extends ExternalScanNode {
 
     long tableId;
 
-    public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long 
tableId) {
-        super(id, desc, "GROUP_COMMIT_SCAN_NODE",
-                StatisticalType.GROUP_COMMIT_SCAN_NODE, false);
+    public GroupCommitScanNode(PlanNodeId id, TupleDescriptor desc, long 
tableId, ScanContext scanContext) {
+        super(id, desc, "GROUP_COMMIT_SCAN_NODE", 
StatisticalType.GROUP_COMMIT_SCAN_NODE, scanContext, false);
         this.tableId = tableId;
     }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 24932745b95..38a56350ebd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -212,8 +212,8 @@ public class OlapScanNode extends ScanNode {
     private Column globalRowIdColumn;
 
     // Constructs node to scan given data files of table 'tbl'.
-    public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName) {
-        super(id, desc, planNodeName, StatisticalType.OLAP_SCAN_NODE);
+    public OlapScanNode(PlanNodeId id, TupleDescriptor desc, String 
planNodeName, ScanContext scanContext) {
+        super(id, desc, planNodeName, scanContext, 
StatisticalType.OLAP_SCAN_NODE);
         olapTable = (OlapTable) desc.getTable();
         distributionColumnIds = Sets.newTreeSet();
 
@@ -1091,8 +1091,9 @@ public class OlapScanNode extends ScanNode {
     public int getNumInstances() {
         // In pipeline exec engine, the instance num equals be_num * parallel 
instance.
         // so here we need count distinct be_num to do the work. make sure get 
right instance
-        if 
(ConnectContext.get().getSessionVariable().isIgnoreStorageDataDistribution()) {
-            return 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        ConnectContext context = ConnectContext.get();
+        if (context != null && 
context.getSessionVariable().isIgnoreStorageDataDistribution()) {
+            return 
context.getSessionVariable().getParallelExecInstanceNum(scanContext.getClusterName());
         }
         return scanRangeLocations.size();
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
index be92a932c03..1b4e35d3821 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanFragment.java
@@ -228,8 +228,10 @@ public class PlanFragment extends TreeNode<PlanFragment> {
      * Assign ParallelExecNum by default value for Asynchronous request
      */
     public void setParallelExecNumIfExists() {
-        if (ConnectContext.get() != null) {
-            parallelExecNum = 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum();
+        ConnectContext context = ConnectContext.get();
+        if (context != null) {
+            String clusterName = 
context.getSessionVariable().resolveCloudClusterName(context);
+            parallelExecNum = 
context.getSessionVariable().getParallelExecInstanceNum(clusterName);
         }
     }
 
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
new file mode 100644
index 00000000000..875bbffa8e3
--- /dev/null
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanContext.java
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.planner;
+
+/**
+ * Shared context for scan planning/runtime decisions.
+ * <p>
+ * Keep this object immutable so scan nodes can safely cache it and
+ * we can evolve fields incrementally in future.
+ */
+public final class ScanContext {
+    public static final ScanContext EMPTY = new ScanContext("");
+
+    private final String clusterName;
+
+    private ScanContext(String clusterName) {
+        this.clusterName = clusterName == null ? "" : clusterName;
+    }
+
+    public static Builder builder() {
+        return new Builder();
+    }
+
+    public String getClusterName() {
+        return clusterName;
+    }
+
+    public static final class Builder {
+        private String clusterName = "";
+
+        public Builder clusterName(String clusterName) {
+            this.clusterName = clusterName;
+            return this;
+        }
+
+        public ScanContext build() {
+            if (clusterName == null || clusterName.isEmpty()) {
+                return ScanContext.EMPTY;
+            }
+            return new ScanContext(clusterName);
+        }
+    }
+}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index bb7eca6accc..cd26b28bcbd 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -74,6 +74,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -112,10 +113,18 @@ public abstract class ScanNode extends PlanNode 
implements SplitGenerator {
     // This is also important for local shuffle logic.
     // Now only OlapScanNode and FileQueryScanNode implement this.
     protected HashSet<Long> scanBackendIds = new HashSet<>();
+    // Immutable scan context used for evolving scan-related metadata.
+    protected final ScanContext scanContext;
 
-    public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, 
StatisticalType statisticalType) {
+    public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, 
ScanContext scanContext) {
+        this(id, desc, planNodeName, scanContext, StatisticalType.DEFAULT);
+    }
+
+    public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName, 
ScanContext scanContext,
+            StatisticalType statisticalType) {
         super(id, desc.getId().asList(), planNodeName, statisticalType);
         this.desc = desc;
+        this.scanContext = Objects.requireNonNull(scanContext, "scanContext 
can not be null");
     }
 
     protected List<Column> getColumns() {
@@ -702,11 +711,21 @@ public abstract class ScanNode extends PlanNode 
implements SplitGenerator {
         return selectedSplitNum;
     }
 
+    public ScanContext getScanContext() {
+        return scanContext;
+    }
+
     @Override
     public boolean isSerialOperator() {
-        return numScanBackends() <= 0 || getScanRangeNum()
-                < 
ConnectContext.get().getSessionVariable().getParallelExecInstanceNum() * 
numScanBackends()
-                || (ConnectContext.get() != null && 
ConnectContext.get().getSessionVariable().isForceToLocalShuffle());
+        ConnectContext context = ConnectContext.get();
+        if (context == null) {
+            return numScanBackends() <= 0;
+        }
+        int parallelExecInstanceNum = context.getSessionVariable()
+                .getParallelExecInstanceNum(scanContext.getClusterName());
+        return numScanBackends() <= 0
+                || getScanRangeNum() < parallelExecInstanceNum * 
numScanBackends()
+                || context.getSessionVariable().isForceToLocalShuffle();
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java 
b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
index 52932fe65d1..9b6a54a1957 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SchemaScanNode.java
@@ -66,8 +66,9 @@ public class SchemaScanNode extends ScanNode {
      * Constructs node to scan given data files of table 'tbl'.
      */
     public SchemaScanNode(PlanNodeId id, TupleDescriptor desc,
-            String schemaCatalog, String schemaDb, String schemaTable, 
List<Expr> frontendConjuncts) {
-        super(id, desc, "SCAN SCHEMA", StatisticalType.SCHEMA_SCAN_NODE);
+            String schemaCatalog, String schemaDb, String schemaTable, 
List<Expr> frontendConjuncts,
+            ScanContext scanContext) {
+        super(id, desc, "SCAN SCHEMA", scanContext, 
StatisticalType.SCHEMA_SCAN_NODE);
         this.tableName = desc.getTable().getName();
         this.schemaCatalog = schemaCatalog;
         this.schemaDb = schemaDb;
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 81c6e40eb73..34d1fce5a70 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.SetVar;
 import org.apache.doris.analysis.StatementBase;
 import org.apache.doris.analysis.StringLiteral;
 import org.apache.doris.catalog.Env;
+import org.apache.doris.cloud.qe.ComputeGroupException;
 import org.apache.doris.common.Config;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.VariableAnnotation;
@@ -3980,7 +3981,7 @@ public class SessionVariable implements Serializable, 
Writable {
         this.debugSkipFoldConstant = debugSkipFoldConstant;
     }
 
-    public int getParallelExecInstanceNum() {
+    public int getParallelExecInstanceNum(String clusterName) {
         ConnectContext connectContext = ConnectContext.get();
         if (connectContext != null && connectContext.getEnv() != null && 
connectContext.getEnv().getAuth() != null) {
             int userParallelExecInstanceNum = connectContext.getEnv().getAuth()
@@ -3989,8 +3990,12 @@ public class SessionVariable implements Serializable, 
Writable {
                 return userParallelExecInstanceNum;
             }
         }
+        String resolvedClusterName = clusterName;
+        if (Config.isCloudMode() && 
Strings.isNullOrEmpty(resolvedClusterName)) {
+            resolvedClusterName = resolveCloudClusterName(connectContext);
+        }
         if (parallelPipelineTaskNum == 0) {
-            int size = Env.getCurrentSystemInfo().getMinPipelineExecutorSize();
+            int size = 
Env.getCurrentSystemInfo().getMinPipelineExecutorSize(resolvedClusterName);
             int autoInstance = (size + 1) / 2;
             return Math.min(autoInstance, maxInstanceNum);
         } else {
@@ -3998,6 +4003,31 @@ public class SessionVariable implements Serializable, 
Writable {
         }
     }
 
+    public String resolveCloudClusterName() {
+        return resolveCloudClusterName(ConnectContext.get());
+    }
+
+    public String resolveCloudClusterName(ConnectContext connectContext) {
+        if (!Config.isCloudMode()) {
+            return "";
+        }
+        if (!Strings.isNullOrEmpty(cloudCluster)) {
+            return cloudCluster;
+        }
+        if (connectContext == null) {
+            return "";
+        }
+        try {
+            String clusterName = connectContext.getCloudCluster(false);
+            return clusterName == null ? "" : clusterName;
+        } catch (ComputeGroupException e) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("failed to resolve cloud cluster for parallel 
instance num", e);
+            }
+            return "";
+        }
+    }
+
     public boolean getEnablePreferCachedRowset() {
         ConnectContext connectContext = ConnectContext.get();
         if (connectContext != null && connectContext.getEnv() != null && 
connectContext.getEnv().getAuth() != null) {
@@ -4912,7 +4942,8 @@ public class SessionVariable implements Serializable, 
Writable {
         }
         tResult.setBeExecVersion(Config.be_exec_version);
         tResult.setEnableLocalShuffle(enableLocalShuffle);
-        tResult.setParallelInstance(getParallelExecInstanceNum());
+        String clusterName = resolveCloudClusterName();
+        tResult.setParallelInstance(getParallelExecInstanceNum(clusterName));
         tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
         
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
         
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java 
b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
index 71646ac6bc7..2c8d095c45f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java
@@ -336,7 +336,9 @@ public class StmtExecutor {
         builder.instancesNumPerBe(
                 beToInstancesNum.entrySet().stream().map(entry -> 
entry.getKey() + ":" + entry.getValue())
                         .collect(Collectors.joining(",")));
-        
builder.parallelFragmentExecInstance(String.valueOf(context.sessionVariable.getParallelExecInstanceNum()));
+        String clusterName = 
context.sessionVariable.resolveCloudClusterName(context);
+        builder.parallelFragmentExecInstance(
+                
String.valueOf(context.sessionVariable.getParallelExecInstanceNum(clusterName)));
         builder.traceId(context.getSessionVariable().getTraceId());
         builder.isNereids(context.getState().isNereids() ? "Yes" : "No");
         return builder.build();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java 
b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
index 59264ee6bb6..df2bac3786f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/system/SystemInfoService.java
@@ -1128,7 +1128,9 @@ public class SystemInfoService {
         return idToBackendRef;
     }
 
-    public int getMinPipelineExecutorSize() {
+    // CloudSystemInfoService override.
+    // Non-cloud ignores clusterName and calculates from all backends.
+    public int getMinPipelineExecutorSize(String clusterName) {
         List<Backend> currentBackends = null;
         try {
             currentBackends = getAllBackendsByAllCluster().values().asList();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
index 66f344f03be..c4be29b1617 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/DataGenTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.DataGenScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TDataGenFunctionName;
@@ -34,6 +35,7 @@ public abstract class DataGenTableValuedFunction extends 
TableValuedFunctionIf {
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
-        return new DataGenScanNode(id, desc, this);
+        return new DataGenScanNode(id, desc, this,
+                
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
index e543574a561..e08eaa2c825 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java
@@ -52,6 +52,7 @@ import org.apache.doris.datasource.tvf.source.TVFScanNode;
 import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.exceptions.NotSupportedException;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.InternalService;
 import org.apache.doris.proto.InternalService.PFetchTableSchemaRequest;
@@ -240,7 +241,8 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
-        return new TVFScanNode(id, desc, false, sv);
+        return new TVFScanNode(id, desc, false, sv,
+                
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
     }
 
     @Override
@@ -540,4 +542,3 @@ public abstract class ExternalFileTableValuedFunction 
extends TableValuedFunctio
         }
     }
 }
-
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
index 111c3b9370d..074308bea32 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/GroupCommitTableValuedFunction.java
@@ -29,6 +29,7 @@ import org.apache.doris.nereids.exceptions.AnalysisException;
 import org.apache.doris.planner.GroupCommitPlanner;
 import org.apache.doris.planner.GroupCommitScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.ConnectContext;
 import org.apache.doris.qe.SessionVariable;
@@ -91,7 +92,8 @@ public class GroupCommitTableValuedFunction extends 
ExternalFileTableValuedFunct
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
-        return new GroupCommitScanNode(id, desc, tableId);
+        return new GroupCommitScanNode(id, desc, tableId,
+                
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
index 040d47c7b41..e39857e4588 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/JdbcQueryTableValueFunction.java
@@ -25,6 +25,7 @@ import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.jdbc.JdbcExternalCatalog;
 import org.apache.doris.datasource.jdbc.source.JdbcScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.SessionVariable;
 
@@ -54,6 +55,7 @@ public class JdbcQueryTableValueFunction extends 
QueryTableValueFunction {
                 desc.getTable().getFullSchema(), TableType.JDBC);
         catalog.configureJdbcTable(jdbcTable, desc.getTable().getName());
         desc.setTable(jdbcTable);
-        return new JdbcScanNode(id, desc, true, query);
+        return new JdbcScanNode(id, desc, true, query,
+                
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
index 32be139657c..39fde6a5615 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/MetadataTableValuedFunction.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.datasource.tvf.source.MetadataScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.thrift.TMetaScanRange;
@@ -64,6 +65,7 @@ public abstract class MetadataTableValuedFunction extends 
TableValuedFunctionIf
 
     @Override
     public ScanNode getScanNode(PlanNodeId id, TupleDescriptor desc, 
SessionVariable sv) {
-        return new MetadataScanNode(id, desc, this);
+        return new MetadataScanNode(id, desc, this,
+                
ScanContext.builder().clusterName(sv.resolveCloudClusterName()).build());
     }
 }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
index 85d14585f21..85bd4677a0c 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/cloud/system/CloudSystemInfoServiceTest.java
@@ -369,7 +369,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Since there are no backends in the cluster, should return 1
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(1, result);
         } finally {
             ConnectContext.remove();
@@ -403,7 +403,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return the pipeline executor size of the single backend
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(8, result);
         } finally {
             ConnectContext.remove();
@@ -454,7 +454,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return the minimum pipeline executor size (6)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(6, result);
         } finally {
             ConnectContext.remove();
@@ -505,7 +505,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return the minimum positive pipeline executor size (4)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(4, result);
         } finally {
             ConnectContext.remove();
@@ -549,7 +549,7 @@ public class CloudSystemInfoServiceTest {
         try {
             // Should return 1 when no valid pipeline executor sizes are
             // found
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(1, result);
         } finally {
             ConnectContext.remove();
@@ -565,7 +565,7 @@ public class CloudSystemInfoServiceTest {
         createTestConnectContext(null);
         try {
             // Should return 1 when no cluster is set in ConnectContext
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize("");
             Assert.assertEquals(1, result);
         } finally {
             ConnectContext.remove();
@@ -628,7 +628,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return 8 (minimum valid size)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(8, result);
         } finally {
             ConnectContext.remove();
@@ -679,7 +679,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return 512 (minimum among large values)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(512, result);
         } finally {
             ConnectContext.remove();
@@ -715,7 +715,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return 32 (consistent across all backends)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(clusterName);
             Assert.assertEquals(32, result);
         } finally {
             ConnectContext.remove();
@@ -786,7 +786,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return 8 (minimum from current cluster2), not 2 (global 
minimum from cluster1)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(cluster2Name);
             Assert.assertEquals(8, result);
         } finally {
             ConnectContext.remove();
@@ -860,14 +860,14 @@ public class CloudSystemInfoServiceTest {
         try {
             // Should return 32 (minimum from virtual cluster's physical 
cluster), not 8
             // (from other cluster)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = 
infoService.getMinPipelineExecutorSize(virtualClusterName);
             Assert.assertEquals(32, result);
 
             // Switch to other cluster
             ctx.setCloudCluster(otherClusterName);
 
             // Should return 8 (from other cluster)
-            result = infoService.getMinPipelineExecutorSize();
+            result = infoService.getMinPipelineExecutorSize(otherClusterName);
             Assert.assertEquals(8, result);
 
         } finally {
@@ -885,7 +885,7 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return 1 because no cluster is set (will catch 
AnalysisException)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize("");
             Assert.assertEquals(1, result);
 
         } finally {
@@ -958,14 +958,14 @@ public class CloudSystemInfoServiceTest {
 
         try {
             // Should return 2 (minimum from cluster1), not 16 (minimum from 
cluster2)
-            int result = infoService.getMinPipelineExecutorSize();
+            int result = infoService.getMinPipelineExecutorSize(cluster1Name);
             Assert.assertEquals(2, result);
 
             // Now switch to cluster2
             ctx.setCloudCluster(cluster2Name);
 
             // Should return 16 (minimum from cluster2), not 2 (minimum from 
cluster1)
-            result = infoService.getMinPipelineExecutorSize();
+            result = infoService.getMinPipelineExecutorSize(cluster2Name);
             Assert.assertEquals(16, result);
         } finally {
             // Clean up ConnectContext
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
index 8b1d98e509a..aee814907bc 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/FileQueryScanNodeTest.java
@@ -22,8 +22,8 @@ import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.UserException;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
-import org.apache.doris.statistics.StatisticalType;
 import org.apache.doris.thrift.TFileFormatType;
 
 import org.junit.Assert;
@@ -38,8 +38,7 @@ public class FileQueryScanNodeTest {
 
     private static class TestFileQueryScanNode extends FileQueryScanNode {
         TestFileQueryScanNode(SessionVariable sv) {
-            super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), 
"test",
-                    StatisticalType.TEST_EXTERNAL_TABLE, false, sv);
+            super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), 
"test", ScanContext.EMPTY, false, sv);
         }
 
         @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
index 727ff939003..ba7687157ea 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/hive/source/HiveScanNodeTest.java
@@ -23,6 +23,7 @@ import org.apache.doris.datasource.hive.HMSExternalCatalog;
 import org.apache.doris.datasource.hive.HMSExternalTable;
 import org.apache.doris.datasource.hive.HiveMetaStoreCache;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 
 import org.junit.Assert;
@@ -46,7 +47,7 @@ public class HiveScanNodeTest {
         Mockito.when(table.getCatalog()).thenReturn(catalog);
         Mockito.when(catalog.bindBrokerName()).thenReturn("");
         desc.setTable(table);
-        HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, 
sv, null);
+        HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, 
sv, null, ScanContext.EMPTY);
 
         HiveMetaStoreCache.FileCacheValue fileCacheValue = new 
HiveMetaStoreCache.FileCacheValue();
         HiveMetaStoreCache.HiveFileStatus status = new 
HiveMetaStoreCache.HiveFileStatus();
@@ -71,7 +72,7 @@ public class HiveScanNodeTest {
         Mockito.when(table.getCatalog()).thenReturn(catalog);
         Mockito.when(catalog.bindBrokerName()).thenReturn("");
         desc.setTable(table);
-        HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, 
sv, null);
+        HiveScanNode node = new HiveScanNode(new PlanNodeId(0), desc, false, 
sv, null, ScanContext.EMPTY);
 
         HiveMetaStoreCache.FileCacheValue fileCacheValue = new 
HiveMetaStoreCache.FileCacheValue();
         HiveMetaStoreCache.HiveFileStatus status = new 
HiveMetaStoreCache.HiveFileStatus();
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
index 48031a2303e..e16ee803dca 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/iceberg/source/IcebergScanNodeTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.iceberg.source;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 
 import org.apache.iceberg.DataFile;
@@ -37,7 +38,7 @@ public class IcebergScanNodeTest {
 
     private static class TestIcebergScanNode extends IcebergScanNode {
         TestIcebergScanNode(SessionVariable sv) {
-            super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv);
+            super(new PlanNodeId(0), new TupleDescriptor(new TupleId(0)), sv, 
ScanContext.EMPTY);
         }
 
         @Override
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
index 09795c53910..454f2d42d9e 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/paimon/source/PaimonScanNodeTest.java
@@ -25,6 +25,7 @@ import org.apache.doris.datasource.FileQueryScanNode;
 import org.apache.doris.datasource.FileSplitter;
 import org.apache.doris.datasource.paimon.PaimonFileExternalCatalog;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 
 import org.apache.paimon.data.BinaryRow;
@@ -60,7 +61,7 @@ public class PaimonScanNodeTest {
     public void testSplitWeight() throws UserException {
 
         TupleDescriptor desc = new TupleDescriptor(new TupleId(3));
-        PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1), 
desc, false, sv);
+        PaimonScanNode paimonScanNode = new PaimonScanNode(new PlanNodeId(1), 
desc, false, sv, ScanContext.EMPTY);
 
         paimonScanNode.setSource(new PaimonSource());
 
@@ -387,7 +388,8 @@ public class PaimonScanNodeTest {
     public void testDetermineTargetFileSplitSizeHonorsMaxFileSplitNum() throws 
Exception {
         SessionVariable sv = new SessionVariable();
         sv.setMaxFileSplitNum(100);
-        PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new 
TupleDescriptor(new TupleId(0)), false, sv);
+        PaimonScanNode node = new PaimonScanNode(new PlanNodeId(0), new 
TupleDescriptor(new TupleId(0)),
+                false, sv, ScanContext.EMPTY);
 
         PaimonSource source = Mockito.mock(PaimonSource.class);
         
Mockito.when(source.getFileFormatFromTableProperties()).thenReturn("parquet");
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
index a3df5dc8a04..9700c306f51 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/MetadataScanNodeTest.java
@@ -20,6 +20,7 @@ package org.apache.doris.datasource.tvf.source;
 import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.system.Backend;
 import org.apache.doris.tablefunction.MetadataTableValuedFunction;
 import org.apache.doris.thrift.TMetaScanRange;
@@ -63,7 +64,7 @@ public class MetadataScanNodeTest {
      */
     @Test
     public void testInitedScanRangeLocationsInitialState() throws Exception {
-        MetadataScanNode scanNode = new MetadataScanNode(planNodeId, 
tupleDescriptor, mockTvf);
+        MetadataScanNode scanNode = new MetadataScanNode(planNodeId, 
tupleDescriptor, mockTvf, ScanContext.EMPTY);
 
         // Use reflection to access the private field
         Field field = 
MetadataScanNode.class.getDeclaredField("initedScanRangeLocations");
@@ -90,7 +91,7 @@ public class MetadataScanNodeTest {
 
         
Mockito.when(mockTvf.getMetaScanRange(Mockito.anyList())).thenReturn(metaScanRange);
 
-        MetadataScanNode scanNode = new MetadataScanNode(planNodeId, 
tupleDescriptor, mockTvf);
+        MetadataScanNode scanNode = new MetadataScanNode(planNodeId, 
tupleDescriptor, mockTvf, ScanContext.EMPTY);
 
         // Mock the backend policy using reflection
         mockBackendPolicy(scanNode);
@@ -128,7 +129,7 @@ public class MetadataScanNodeTest {
 
         
Mockito.when(mockTvf.getMetaScanRange(Mockito.anyList())).thenReturn(metaScanRange);
 
-        MetadataScanNode scanNode = new MetadataScanNode(planNodeId, 
tupleDescriptor, mockTvf);
+        MetadataScanNode scanNode = new MetadataScanNode(planNodeId, 
tupleDescriptor, mockTvf, ScanContext.EMPTY);
         mockBackendPolicy(scanNode);
 
         // Call getScanRangeLocations multiple times
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
index 8d591362376..8cf98daea94 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/datasource/tvf/source/TVFScanNodeTest.java
@@ -21,6 +21,7 @@ import org.apache.doris.analysis.TupleDescriptor;
 import org.apache.doris.analysis.TupleId;
 import org.apache.doris.catalog.FunctionGenTable;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.qe.SessionVariable;
 import org.apache.doris.tablefunction.ExternalFileTableValuedFunction;
 import org.apache.doris.thrift.TBrokerFileStatus;
@@ -45,7 +46,7 @@ public class TVFScanNodeTest {
         ExternalFileTableValuedFunction tvf = 
Mockito.mock(ExternalFileTableValuedFunction.class);
         Mockito.when(table.getTvf()).thenReturn(tvf);
         desc.setTable(table);
-        TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv);
+        TVFScanNode node = new TVFScanNode(new PlanNodeId(0), desc, false, sv, 
ScanContext.EMPTY);
 
         TBrokerFileStatus status = new TBrokerFileStatus();
         status.setSize(10_000L * MB);
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
index 2e69f7e31f9..79618651714 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/HmsQueryCacheTest.java
@@ -42,6 +42,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.CreateCatalogCommand;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.qe.cache.CacheAnalyzer;
 import org.apache.doris.qe.cache.SqlCache;
@@ -227,7 +228,7 @@ public class HmsQueryCacheTest extends AnalyzeCheckTestBase 
{
 
         TupleDescriptor desc = new TupleDescriptor(new TupleId(1));
         
desc.setTable(mgr.getInternalCatalog().getDbNullable("test").getTableNullable("tbl1"));
-        olapScanNode = new OlapScanNode(new PlanNodeId(1), desc, 
"tb1ScanNode");
+        olapScanNode = new OlapScanNode(new PlanNodeId(1), desc, 
"tb1ScanNode", ScanContext.EMPTY);
     }
 
     @Test
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
index f499caa96f6..e897ddc0f15 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/qe/OlapQueryCacheTest.java
@@ -56,6 +56,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
 import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
 import org.apache.doris.planner.OlapScanNode;
 import org.apache.doris.planner.PlanNodeId;
+import org.apache.doris.planner.ScanContext;
 import org.apache.doris.planner.ScanNode;
 import org.apache.doris.proto.Types;
 import org.apache.doris.qe.cache.Cache;
@@ -378,7 +379,7 @@ public class OlapQueryCacheTest {
         OlapTable table = createProfileTable();
         TupleDescriptor desc = new TupleDescriptor(new TupleId(20004));
         desc.setTable(table);
-        OlapScanNode node = new OlapScanNode(new PlanNodeId(20008), desc, 
"userprofilenode");
+        OlapScanNode node = new OlapScanNode(new PlanNodeId(20008), desc, 
"userprofilenode", ScanContext.EMPTY);
         node.setSelectedPartitionIds(selectedPartitionIds);
         return node;
     }
@@ -491,7 +492,7 @@ public class OlapQueryCacheTest {
         OlapTable table = createEventTable();
         TupleDescriptor desc = new TupleDescriptor(new TupleId(30002));
         desc.setTable(table);
-        OlapScanNode node = new OlapScanNode(new PlanNodeId(30004), desc, 
"appeventnode");
+        OlapScanNode node = new OlapScanNode(new PlanNodeId(30004), desc, 
"appeventnode", ScanContext.EMPTY);
         node.setSelectedPartitionIds(selectedPartitionIds);
         return node;
     }
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
index 0ad09096626..033568017d9 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/system/SystemInfoServiceTest.java
@@ -447,7 +447,7 @@ public class SystemInfoServiceTest {
     @Test
     public void testGetMinPipelineExecutorSize() {
         // Test case 1: No backends
-        int result = infoService.getMinPipelineExecutorSize();
+        int result = infoService.getMinPipelineExecutorSize("");
         Assert.assertEquals(1, result);
 
         // Test case 2: Single backend with pipeline executor size = 8
@@ -456,7 +456,7 @@ public class SystemInfoServiceTest {
         be1.setPipelineExecutorSize(8);
         be1.setAlive(true);
 
-        result = infoService.getMinPipelineExecutorSize();
+        result = infoService.getMinPipelineExecutorSize("");
         Assert.assertEquals(8, result);
 
         // Test case 3: Multiple backends with different pipeline executor 
sizes
@@ -470,7 +470,7 @@ public class SystemInfoServiceTest {
         be3.setPipelineExecutorSize(12);
         be3.setAlive(true);
 
-        result = infoService.getMinPipelineExecutorSize();
+        result = infoService.getMinPipelineExecutorSize("");
         Assert.assertEquals(4, result);
 
         // Test case 4: Backends with zero and negative pipeline executor 
sizes (should
@@ -485,7 +485,7 @@ public class SystemInfoServiceTest {
         be5.setPipelineExecutorSize(-1); // Should be ignored
         be5.setAlive(true);
 
-        result = infoService.getMinPipelineExecutorSize();
+        result = infoService.getMinPipelineExecutorSize("");
         Assert.assertEquals(4, result); // Still should be 4 from be2
 
         // Test case 5: All backends have zero or negative pipeline executor 
sizes
@@ -493,7 +493,7 @@ public class SystemInfoServiceTest {
         be2.setPipelineExecutorSize(-5);
         be3.setPipelineExecutorSize(0);
 
-        result = infoService.getMinPipelineExecutorSize();
+        result = infoService.getMinPipelineExecutorSize("");
         Assert.assertEquals(1, result); // Should return default value 1
 
         // Test case 6: Mix of positive and non-positive values
@@ -501,7 +501,7 @@ public class SystemInfoServiceTest {
         be2.setPipelineExecutorSize(0); // ignored
         be3.setPipelineExecutorSize(6); // This should be the minimum
 
-        result = infoService.getMinPipelineExecutorSize();
+        result = infoService.getMinPipelineExecutorSize("");
         Assert.assertEquals(6, result);
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to